wolfssl/scripts/multi-msg-record.py
Juliusz Sosinowicz 39642d5ad3 Skip multi-msg-record test when wolfSSL built without RSA
The test certs are RSA; if NO_RSA is defined the client can neither
load nor verify them. Detect "RSA not supported" in client -? help
and exit 77 (SKIP) before tlslite-ng tries to use the RSA chain.
2026-05-14 13:10:13 +02:00

655 lines
24 KiB
Python
Executable file
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
#
# multi-msg-record.py
#
# Python half of scripts/multi-msg-record.test (the bash wrapper handles
# NETWORK_UNSHARE_HELPER / AM_BWRAPPED and the python3 availability
# check, then execs this script).
#
# Tests that wolfSSL correctly processes TLS records containing multiple
# handshake messages packed into a single record.
#
# Uses tlslite-ng as the TLS peer to craft multi-message records:
#
# TLS 1.2 Each connection tests TWO code paths back-to-back:
# 1. Initial handshake: RecordMergingSocket rewrites separate
# plaintext ServerHello + Certificate + ServerKeyExchange +
# ServerHelloDone records into one multi-message TLS
# record before forwarding to the wolfSSL client.
# 2. Renegotiation on the same connection: tlslite-ng is
# monkey-patched to coalesce SH+Cert+SKE+SHD into ONE
# encrypted handshake record (exercises the
# curSize -= padSz CBC-padding path and the AEAD path).
#
# TLS 1.3 tlslite-ng's _queue_message / _queue_flush mechanism already
# coalesces EncryptedExtensions + Certificate + CertificateVerify
# + Finished into a single encrypted record. The test verifies
# that wolfSSL parses this correctly.
#
# Multiple cipher suites are tested for both protocol versions.
#
# Requirements: python3, tlslite-ng (pip install tlslite-ng)
import socket
import struct
import subprocess
import os
import sys
import threading
import time
import types
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
WOLFSSL_DIR = os.path.dirname(SCRIPT_DIR)
WOLF_CLIENT = os.path.join(WOLFSSL_DIR, "examples", "client", "client")
CERT_DIR = os.path.join(WOLFSSL_DIR, "certs")
# CA cert path passed to the wolfSSL client via -A. Set in main() after
# detect_wolf_features() determines whether the build accepts PEM or DER.
WOLF_CA_CERT = os.path.join(CERT_DIR, "ca-cert.pem")
# ---------------------------------------------------------------------------
# Bypass a strict tlslite-ng validation that rejects wolfSSL's ClientHello
# when the client advertises FFDHE groups in a TLS-1.3-only hello.
# This must happen before importing TLSConnection.
#
# If tlslite-ng isn't installed we exit 77 so automake marks the test
# SKIPped instead of FAILed.
# ---------------------------------------------------------------------------
try:
import tlslite.tlsconnection # noqa: E402
import tlslite.recordlayer # noqa: E402
tlslite.tlsconnection.TLS_1_3_FORBIDDEN_GROUPS = frozenset()
from tlslite import ( # noqa: E402
TLSConnection, HandshakeSettings, X509CertChain, parsePEMKey,
)
from tlslite.constants import ContentType # noqa: E402
from tlslite.extensions import RenegotiationInfoExtension # noqa: E402
from tlslite.constants import ExtensionType # noqa: E402
from tlslite.messages import HelloMessage, Message as TLSMessage # noqa: E402
except ImportError as e:
sys.stdout.write(
"tlslite-ng not installed ({}); skipping multi-msg-record test\n"
" (install with: pip install tlslite-ng)\n".format(e))
sys.exit(77)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
HS_NAMES = {
2: "SH", 4: "NST", 8: "EE", 11: "Cert", 12: "SKE",
13: "CR", 14: "SHD", 15: "CV", 16: "CKE", 20: "Fin",
}
PASS_COUNT = 0
FAIL_COUNT = 0
SKIP_COUNT = 0
def passed(label):
global PASS_COUNT
PASS_COUNT += 1
print(f" PASS: {label}")
def failed(label):
global FAIL_COUNT
FAIL_COUNT += 1
print(f" FAIL: {label}")
def skipped(label):
global SKIP_COUNT
SKIP_COUNT += 1
print(f" SKIP: {label}")
def detect_wolf_features():
"""Probe the wolfSSL client binary to find which features are
compiled in. Used to decide which test phases to run.
Returns dict with keys: tls12 (bool), tls13 (bool),
secure_reneg (bool), rsa (bool), ciphers (set[str]), ca_cert (str).
"""
feats = {"tls12": False, "tls13": False, "secure_reneg": False,
"rsa": True,
"ciphers": set(),
"ca_cert": os.path.join(CERT_DIR, "ca-cert.pem")}
# ./client -V -> e.g. "3:4:d(downgrade):e(either):"
try:
r = subprocess.run([WOLF_CLIENT, "-V"],
capture_output=True, timeout=5)
parts = r.stdout.decode("utf-8", errors="replace").strip().split(":")
feats["tls12"] = "3" in parts
feats["tls13"] = "4" in parts
except Exception:
pass
# ./client -? -> help text includes "-R" only when
# HAVE_SECURE_RENEGOTIATION is defined. The default -A path
# ("ca-cert.pem" vs "ca-cert.der") also tells us which CA file
# format the build can load. The RSA key-size line reports
# "RSA not supported" when NO_RSA is defined.
try:
r = subprocess.run([WOLF_CLIENT, "-?"],
capture_output=True, timeout=5)
htxt = r.stdout.decode("utf-8", errors="replace")
feats["secure_reneg"] = ("Allow Secure Renegotiation" in htxt)
if "ca-cert.der" in htxt and "ca-cert.pem" not in htxt:
feats["ca_cert"] = os.path.join(CERT_DIR, "ca-cert.der")
if "RSA not supported" in htxt:
feats["rsa"] = False
except Exception:
pass
# ./client -e -> colon-separated list of supported cipher suites.
try:
r = subprocess.run([WOLF_CLIENT, "-e"],
capture_output=True, timeout=5)
ctxt = r.stdout.decode("utf-8", errors="replace").strip()
feats["ciphers"] = {c for c in ctxt.split(":") if c}
except Exception:
pass
return feats
def _load_chain(cert_file):
with open(cert_file) as f:
chain = X509CertChain()
chain.parsePemList(f.read())
return chain
def _load_key(key_file):
with open(key_file) as f:
return parsePEMKey(f.read(), private=True)
def _parse_hs_types(data):
"""Parse handshake message types from raw handshake content."""
msgs = []
off = 0
while off + 4 <= len(data):
ht = data[off]
hl = struct.unpack("!I", b"\x00" + bytes(data[off + 1 : off + 4]))[0]
msgs.append(HS_NAMES.get(ht, f"T{ht}"))
off += 4 + hl
return msgs
def _get_free_port():
"""Get an available TCP port."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
return s.getsockname()[1]
def _listen_socket():
"""Bind a listening TCP socket on localhost with the standard test timeout."""
port = _get_free_port()
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
srv.bind(("127.0.0.1", port))
srv.listen(1)
srv.settimeout(15)
return srv, port
def _run_wolf_client(port, version, cipher, extra=()):
"""Invoke the wolfSSL example client against 127.0.0.1:port.
WOLF_CA_CERT is PEM or DER depending on the build (NO_CODING /
OPENSSL_EXTRA builds don't both support PEM).
"""
cmd = [WOLF_CLIENT, "-h", "127.0.0.1", "-p", str(port),
"-v", version, "-A", WOLF_CA_CERT,
"-g", *extra]
if cipher:
cmd.extend(["-l", cipher])
return subprocess.run(cmd, capture_output=True, timeout=15)
class _SendRecordTrace:
"""Context manager that wraps RecordLayer.sendRecord to log every record."""
def __init__(self):
self.log = []
self._orig = None
def __enter__(self):
self._orig = tlslite.recordlayer.RecordLayer.sendRecord
log = self.log
orig = self._orig
def wrapper(self_rl, msg):
data = msg.write()
ct = msg.contentType
encrypted = bool(self_rl._writeState
and self_rl._writeState.encContext)
hs_msgs = []
if ct == ContentType.handshake:
hs_msgs = _parse_hs_types(data)
log.append((ct, encrypted, len(data), hs_msgs))
yield from orig(self_rl, msg)
tlslite.recordlayer.RecordLayer.sendRecord = wrapper
return self.log
def __exit__(self, *exc):
tlslite.recordlayer.RecordLayer.sendRecord = self._orig
# ---------------------------------------------------------------------------
# RecordMergingSocket (TLS 1.2 plaintext record merging)
# ---------------------------------------------------------------------------
class RecordMergingSocket:
"""Socket wrapper that rewrites consecutive TLS handshake records into
a single multi-message record. Only merges plaintext records that
precede ChangeCipherSpec."""
def __init__(self, sock):
self._sock = sock
self._pending = bytearray()
self._ver = 0x0303
self._after_ccs = False
self.merged_msgs = [] # [(n_msgs, [names], size)]
def _flush(self):
if not self._pending:
return
msgs = _parse_hs_types(self._pending)
n = len(msgs)
hdr = struct.pack("!BHH", 22, self._ver, len(self._pending))
self._sock.sendall(hdr + bytes(self._pending))
self.merged_msgs.append((n, msgs, len(self._pending)))
self._pending = bytearray()
# Called by BufferedSocket (one record per call, or multiple from flush)
def _process(self, data):
data = bytearray(data)
off = 0
while off + 5 <= len(data):
ct = data[off]
ver = struct.unpack("!H", data[off + 1 : off + 3])[0]
rlen = struct.unpack("!H", data[off + 3 : off + 5])[0]
if off + 5 + rlen > len(data):
break
payload = data[off + 5 : off + 5 + rlen]
if not self._after_ccs and ct == 22:
self._pending.extend(payload)
self._ver = ver
else:
if ct == 20:
self._after_ccs = True
self._flush()
self._sock.sendall(bytes(data[off : off + 5 + rlen]))
off += 5 + rlen
def send(self, data):
self._process(data)
return len(data)
def sendall(self, data):
self._process(data)
def recv(self, bufsize):
self._flush()
return self._sock.recv(bufsize)
def __getattr__(self, name):
return getattr(self._sock, name)
# ---------------------------------------------------------------------------
# Test runners
# ---------------------------------------------------------------------------
def run_tls12_test(cipher_wolf, cert_chain, priv_key, label,
do_reneg=True):
"""TLS 1.2 test one connection optionally exercises two code paths:
Phase 1 (plaintext grouping, initial handshake):
RecordMergingSocket rewrites separate plaintext ServerHello,
Certificate, ServerKeyExchange and ServerHelloDone records into
one multi-message TLS record before delivery to wolfSSL.
Phase 2 (encrypted grouping, renegotiation on same connection):
tlslite-ng server is monkey-patched to coalesce SH+Cert+SKE+SHD
into a single encrypted handshake record inside the renegotiation
(exercises wolfSSL's encrypted multi-message parsing including
curSize -= padSz for CBC padding).
Phase 2 is skipped when do_reneg=False (e.g. the wolfSSL client was
built without HAVE_SECURE_RENEGOTIATION).
"""
srv, port = _listen_socket()
result = {"ok": False, "error": ""}
msock_ref = [None]
trace_log = []
reneg_active = [False]
verify_data = {'client': None, 'server': None}
# --- monkey-patches (used only during this connection) ----------------
orig_calc_key = tlslite.tlsconnection.calc_key
def capturing_calc_key(*args, **kwargs):
res = orig_calc_key(*args, **kwargs)
lbl = args[3] if len(args) > 3 else kwargs.get('label', b'')
if lbl == b"client finished" and verify_data['client'] is None:
verify_data['client'] = bytearray(res)
elif lbl == b"server finished" and verify_data['server'] is None:
verify_data['server'] = bytearray(res)
return res
orig_getExt = HelloMessage.getExtension
def patched_getExt(self, ext_type):
ext = orig_getExt(self, ext_type)
if (ext_type == ExtensionType.renegotiation_info
and ext is not None and reneg_active[0]):
ext._internal_value = bytearray(0)
return ext
orig_rie_create = RenegotiationInfoExtension.create
def patched_rie_create(self, data):
if reneg_active[0] and data == bytearray(0):
combined = (bytearray(verify_data['client'])
+ bytearray(verify_data['server']))
return orig_rie_create(self, combined)
return orig_rie_create(self, data)
# ----------------------------------------------------------------------
def server():
try:
tlslite.tlsconnection.calc_key = capturing_calc_key
HelloMessage.getExtension = patched_getExt
RenegotiationInfoExtension.create = patched_rie_create
conn, _ = srv.accept()
conn.settimeout(15)
msock = RecordMergingSocket(conn)
msock_ref[0] = msock
tls = TLSConnection(msock)
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 3)
# ---------- Phase 1: initial handshake (plaintext grouping) ----
tls.handshakeServer(certChain=cert_chain, privateKey=priv_key,
settings=settings)
tlslite.tlsconnection.calc_key = orig_calc_key
data = tls.recv(4096)
if do_reneg:
# ---------- Phase 2: trigger + run renegotiation ----------
hr = TLSMessage(ContentType.handshake,
bytearray([0, 0, 0, 0]))
for _ in tls._sendMsg(hr, randomizeFirstBlock=False,
update_hashes=False):
pass
# Bypass tlslite-ng renegotiation guards
tls.closed = True
tls.session = None
reneg_active[0] = True
# Coalesce handshake messages into ONE encrypted TLS record
def coalescing_sendMsgs(self, msgs):
for msg in msgs:
self._queue_message(msg)
yield from self._queue_flush()
tls._sendMsgs = types.MethodType(coalescing_sendMsgs, tls)
with _SendRecordTrace() as log:
tls.handshakeServer(certChain=cert_chain,
privateKey=priv_key,
settings=settings)
reneg_active[0] = False
trace_log.extend(log)
if data:
tls.send(data)
tls.close()
result["ok"] = True
except Exception as e:
import traceback
result["error"] = traceback.format_exc()
finally:
tlslite.tlsconnection.calc_key = orig_calc_key
HelloMessage.getExtension = orig_getExt
RenegotiationInfoExtension.create = orig_rie_create
reneg_active[0] = False
srv.close()
st = threading.Thread(target=server, daemon=True)
st.start()
time.sleep(0.1)
proc = _run_wolf_client(port, "3", cipher_wolf,
extra=("-R",) if do_reneg else ())
st.join(timeout=5)
if proc.returncode != 0 or not result["ok"]:
err = (result["error"]
or proc.stderr.decode("utf-8", errors="replace")[:400])
failed(f"{label}: connection failed ({err})")
return False
ok = True
# Phase 1 verification: plaintext multi-message record
msock = msock_ref[0]
has_pt_grouped = False
for n, msgs, sz in (msock.merged_msgs if msock else []):
if n > 1:
has_pt_grouped = True
passed(f"{label} [plaintext]: {n} msgs "
f"[{'+'.join(msgs)}] in one record ({sz} bytes)")
if not has_pt_grouped:
failed(f"{label} [plaintext]: no multi-message record detected")
ok = False
# Phase 2 verification: encrypted multi-message record (renego)
if do_reneg:
has_enc_grouped = False
for ct, enc, sz, msgs in trace_log:
if ct == ContentType.handshake and enc and len(msgs) > 1:
has_enc_grouped = True
passed(f"{label} [encrypted]: {len(msgs)} msgs "
f"[{'+'.join(msgs)}] in one record ({sz} bytes)")
if not has_enc_grouped:
failed(f"{label} [encrypted]: no multi-message "
f"encrypted record")
ok = False
return ok
def run_tls13_test(cipher_wolf, cert_chain, priv_key, label):
"""TLS 1.3: verify tlslite-ng sends multi-msg encrypted record and
wolfSSL client processes it."""
srv, port = _listen_socket()
result = {"ok": False, "error": ""}
def server():
try:
conn, _ = srv.accept()
conn.settimeout(15)
tls = TLSConnection(conn)
settings = HandshakeSettings()
settings.minVersion = (3, 4)
settings.maxVersion = (3, 4)
tls.handshakeServer(certChain=cert_chain, privateKey=priv_key,
settings=settings)
data = tls.recv(4096)
if data:
tls.send(data)
tls.close()
result["ok"] = True
except Exception as e:
result["error"] = str(e)
finally:
srv.close()
with _SendRecordTrace() as log:
st = threading.Thread(target=server, daemon=True)
st.start()
time.sleep(0.1)
proc = _run_wolf_client(port, "4", cipher_wolf)
st.join(timeout=5)
if proc.returncode != 0 or not result["ok"]:
err = result["error"] or proc.stderr.decode("utf-8", errors="replace")[:200]
failed(f"{label}: handshake failed ({err})")
return False
# Check that at least one encrypted handshake record has multiple messages
has_multi = False
for ct, enc, sz, msgs in log:
if ct == ContentType.handshake and enc and len(msgs) > 1:
has_multi = True
passed(f"{label}: {len(msgs)} encrypted msgs "
f"[{'+'.join(msgs)}] in one record ({sz} bytes)")
if not has_multi:
failed(f"{label}: no multi-message encrypted records")
return False
return True
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
if not os.path.isfile(WOLF_CLIENT):
print(f"ERROR: wolfSSL client not found: {WOLF_CLIENT}")
print(" Build wolfSSL first (./configure && make)")
sys.exit(1)
# Probe the client to see which features are compiled in so each
# phase of the test is only run when it can succeed.
feats = detect_wolf_features()
global WOLF_CA_CERT
WOLF_CA_CERT = feats["ca_cert"]
print("=" * 60)
print(" Multi-Message TLS Record Test")
print("=" * 60)
print(f" wolfSSL features: TLS1.2={feats['tls12']} "
f"TLS1.3={feats['tls13']} "
f"secure_reneg={feats['secure_reneg']} "
f"rsa={feats['rsa']}")
# The test certs are RSA; skip the whole test when the wolfSSL build
# has no RSA support (the client can't load or verify them).
if not feats["rsa"]:
print("\n wolfSSL built without RSA; skipping multi-msg-record "
"test (RSA test certs cannot be verified).")
sys.exit(77)
# Load certificate / key pairs
rsa_chain = _load_chain(os.path.join(CERT_DIR, "server-cert.pem"))
rsa_key = _load_key(os.path.join(CERT_DIR, "server-key.pem"))
# ------------------------------------------------------------------
# TLS 1.2 plaintext (initial HS) + optional encrypted (renegotiation)
# multi-message records, same connection per cipher suite.
# ------------------------------------------------------------------
tls12_suites = [
# (wolfSSL cipher name, description)
(None, "default negotiated"),
# AEAD (GCM)
("ECDHE-RSA-AES128-GCM-SHA256", "ECDHE-RSA AES128-GCM"),
("ECDHE-RSA-AES256-GCM-SHA384", "ECDHE-RSA AES256-GCM"),
("DHE-RSA-AES128-GCM-SHA256", "DHE-RSA AES128-GCM"),
("DHE-RSA-AES256-GCM-SHA384", "DHE-RSA AES256-GCM"),
# CBC + HMAC (exercises padding path)
("ECDHE-RSA-AES128-SHA256", "ECDHE-RSA AES128-CBC-SHA256"),
("ECDHE-RSA-AES256-SHA384", "ECDHE-RSA AES256-CBC-SHA384"),
("DHE-RSA-AES128-SHA256", "DHE-RSA AES128-CBC-SHA256"),
("DHE-RSA-AES256-SHA256", "DHE-RSA AES256-CBC-SHA256"),
# AEAD (ChaCha20-Poly1305)
("ECDHE-RSA-CHACHA20-POLY1305", "ECDHE-RSA CHACHA20-POLY1305"),
("DHE-RSA-CHACHA20-POLY1305", "DHE-RSA CHACHA20-POLY1305"),
]
if feats["tls12"]:
if feats["secure_reneg"]:
print("\n--- TLS 1.2: plaintext + encrypted multi-message "
"records ---")
print(" Each connection verifies BOTH code paths:")
print(" * initial handshake -> plaintext SH+Cert+SKE+SHD")
print(" * renegotiation -> encrypted SH+Cert+SKE+SHD")
else:
print("\n--- TLS 1.2: plaintext multi-message records ---")
print(" wolfSSL built without HAVE_SECURE_RENEGOTIATION;")
print(" skipping the encrypted (renegotiation) half.")
print(" Covers multiple key-exchanges, ciphers and MAC "
"families.\n")
for cipher, desc in tls12_suites:
if cipher and cipher not in feats["ciphers"]:
skipped(f"TLS1.2 {desc} - cipher not in wolfSSL build")
continue
run_tls12_test(cipher, rsa_chain, rsa_key,
f"TLS1.2 {desc}",
do_reneg=feats["secure_reneg"])
if not feats["secure_reneg"]:
skipped("TLS1.2 encrypted multi-msg record "
"(requires HAVE_SECURE_RENEGOTIATION)")
else:
skipped(f"TLS 1.2 tests ({len(tls12_suites)} suites) - "
"wolfSSL built without TLS 1.2")
# ------------------------------------------------------------------
# TLS 1.3 encrypted multi-message records
# ------------------------------------------------------------------
tls13_suites = [
# (wolfSSL cipher name, description)
(None, "default negotiated"),
("TLS13-AES128-GCM-SHA256", "AES-128-GCM"),
("TLS13-AES256-GCM-SHA384", "AES-256-GCM"),
("TLS13-CHACHA20-POLY1305-SHA256", "CHACHA20-POLY1305"),
]
if feats["tls13"]:
print("\n--- TLS 1.3: encrypted multi-message records ---")
print(" Server sends EE+Cert+CV+Fin in a single encrypted "
"record;")
print(" wolfSSL client must decrypt and parse.\n")
for cipher, desc in tls13_suites:
if cipher and cipher not in feats["ciphers"]:
skipped(f"TLS1.3 {desc} - cipher not in wolfSSL build")
continue
run_tls13_test(cipher, rsa_chain, rsa_key,
f"TLS1.3 {desc}")
else:
skipped(f"TLS 1.3 tests ({len(tls13_suites)} suites) - "
"wolfSSL built without TLS 1.3")
# ------------------------------------------------------------------
# Summary
# ------------------------------------------------------------------
print()
print("=" * 60)
print(f" Results: {PASS_COUNT} passed, {FAIL_COUNT} failed, "
f"{SKIP_COUNT} skipped")
print("=" * 60)
# If nothing at all could run, signal SKIP (exit 77) so automake
# records the test as skipped rather than passed-with-nothing.
if PASS_COUNT == 0 and FAIL_COUNT == 0:
sys.exit(77)
return FAIL_COUNT == 0
if __name__ == "__main__":
sys.exit(0 if main() else 1)