mirror of
https://github.com/wolfSSL/wolfssl
synced 2026-05-24 10:18:22 +00:00
The test certs are RSA; if NO_RSA is defined the client can neither load nor verify them. Detect "RSA not supported" in client -? help and exit 77 (SKIP) before tlslite-ng tries to use the RSA chain.
655 lines
24 KiB
Python
Executable file
655 lines
24 KiB
Python
Executable file
#!/usr/bin/env python3
|
||
#
|
||
# multi-msg-record.py
|
||
#
|
||
# Python half of scripts/multi-msg-record.test (the bash wrapper handles
|
||
# NETWORK_UNSHARE_HELPER / AM_BWRAPPED and the python3 availability
|
||
# check, then execs this script).
|
||
#
|
||
# Tests that wolfSSL correctly processes TLS records containing multiple
|
||
# handshake messages packed into a single record.
|
||
#
|
||
# Uses tlslite-ng as the TLS peer to craft multi-message records:
|
||
#
|
||
# TLS 1.2 – Each connection tests TWO code paths back-to-back:
|
||
# 1. Initial handshake: RecordMergingSocket rewrites separate
|
||
# plaintext ServerHello + Certificate + ServerKeyExchange +
|
||
# ServerHelloDone records into one multi-message TLS
|
||
# record before forwarding to the wolfSSL client.
|
||
# 2. Renegotiation on the same connection: tlslite-ng is
|
||
# monkey-patched to coalesce SH+Cert+SKE+SHD into ONE
|
||
# encrypted handshake record (exercises the
|
||
# curSize -= padSz CBC-padding path and the AEAD path).
|
||
#
|
||
# TLS 1.3 – tlslite-ng's _queue_message / _queue_flush mechanism already
|
||
# coalesces EncryptedExtensions + Certificate + CertificateVerify
|
||
# + Finished into a single encrypted record. The test verifies
|
||
# that wolfSSL parses this correctly.
|
||
#
|
||
# Multiple cipher suites are tested for both protocol versions.
|
||
#
|
||
# Requirements: python3, tlslite-ng (pip install tlslite-ng)
|
||
|
||
import socket
|
||
import struct
|
||
import subprocess
|
||
import os
|
||
import sys
|
||
import threading
|
||
import time
|
||
import types
|
||
|
||
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
||
WOLFSSL_DIR = os.path.dirname(SCRIPT_DIR)
|
||
WOLF_CLIENT = os.path.join(WOLFSSL_DIR, "examples", "client", "client")
|
||
CERT_DIR = os.path.join(WOLFSSL_DIR, "certs")
|
||
|
||
# CA cert path passed to the wolfSSL client via -A. Set in main() after
|
||
# detect_wolf_features() determines whether the build accepts PEM or DER.
|
||
WOLF_CA_CERT = os.path.join(CERT_DIR, "ca-cert.pem")
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Bypass a strict tlslite-ng validation that rejects wolfSSL's ClientHello
|
||
# when the client advertises FFDHE groups in a TLS-1.3-only hello.
|
||
# This must happen before importing TLSConnection.
|
||
#
|
||
# If tlslite-ng isn't installed we exit 77 so automake marks the test
|
||
# SKIPped instead of FAILed.
|
||
# ---------------------------------------------------------------------------
|
||
try:
|
||
import tlslite.tlsconnection # noqa: E402
|
||
import tlslite.recordlayer # noqa: E402
|
||
tlslite.tlsconnection.TLS_1_3_FORBIDDEN_GROUPS = frozenset()
|
||
|
||
from tlslite import ( # noqa: E402
|
||
TLSConnection, HandshakeSettings, X509CertChain, parsePEMKey,
|
||
)
|
||
from tlslite.constants import ContentType # noqa: E402
|
||
from tlslite.extensions import RenegotiationInfoExtension # noqa: E402
|
||
from tlslite.constants import ExtensionType # noqa: E402
|
||
from tlslite.messages import HelloMessage, Message as TLSMessage # noqa: E402
|
||
except ImportError as e:
|
||
sys.stdout.write(
|
||
"tlslite-ng not installed ({}); skipping multi-msg-record test\n"
|
||
" (install with: pip install tlslite-ng)\n".format(e))
|
||
sys.exit(77)
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Helpers
|
||
# ---------------------------------------------------------------------------
|
||
HS_NAMES = {
|
||
2: "SH", 4: "NST", 8: "EE", 11: "Cert", 12: "SKE",
|
||
13: "CR", 14: "SHD", 15: "CV", 16: "CKE", 20: "Fin",
|
||
}
|
||
|
||
PASS_COUNT = 0
|
||
FAIL_COUNT = 0
|
||
SKIP_COUNT = 0
|
||
|
||
|
||
def passed(label):
|
||
global PASS_COUNT
|
||
PASS_COUNT += 1
|
||
print(f" PASS: {label}")
|
||
|
||
|
||
def failed(label):
|
||
global FAIL_COUNT
|
||
FAIL_COUNT += 1
|
||
print(f" FAIL: {label}")
|
||
|
||
|
||
def skipped(label):
|
||
global SKIP_COUNT
|
||
SKIP_COUNT += 1
|
||
print(f" SKIP: {label}")
|
||
|
||
|
||
def detect_wolf_features():
|
||
"""Probe the wolfSSL client binary to find which features are
|
||
compiled in. Used to decide which test phases to run.
|
||
|
||
Returns dict with keys: tls12 (bool), tls13 (bool),
|
||
secure_reneg (bool), rsa (bool), ciphers (set[str]), ca_cert (str).
|
||
"""
|
||
feats = {"tls12": False, "tls13": False, "secure_reneg": False,
|
||
"rsa": True,
|
||
"ciphers": set(),
|
||
"ca_cert": os.path.join(CERT_DIR, "ca-cert.pem")}
|
||
|
||
# ./client -V -> e.g. "3:4:d(downgrade):e(either):"
|
||
try:
|
||
r = subprocess.run([WOLF_CLIENT, "-V"],
|
||
capture_output=True, timeout=5)
|
||
parts = r.stdout.decode("utf-8", errors="replace").strip().split(":")
|
||
feats["tls12"] = "3" in parts
|
||
feats["tls13"] = "4" in parts
|
||
except Exception:
|
||
pass
|
||
|
||
# ./client -? -> help text includes "-R" only when
|
||
# HAVE_SECURE_RENEGOTIATION is defined. The default -A path
|
||
# ("ca-cert.pem" vs "ca-cert.der") also tells us which CA file
|
||
# format the build can load. The RSA key-size line reports
|
||
# "RSA not supported" when NO_RSA is defined.
|
||
try:
|
||
r = subprocess.run([WOLF_CLIENT, "-?"],
|
||
capture_output=True, timeout=5)
|
||
htxt = r.stdout.decode("utf-8", errors="replace")
|
||
feats["secure_reneg"] = ("Allow Secure Renegotiation" in htxt)
|
||
if "ca-cert.der" in htxt and "ca-cert.pem" not in htxt:
|
||
feats["ca_cert"] = os.path.join(CERT_DIR, "ca-cert.der")
|
||
if "RSA not supported" in htxt:
|
||
feats["rsa"] = False
|
||
except Exception:
|
||
pass
|
||
|
||
# ./client -e -> colon-separated list of supported cipher suites.
|
||
try:
|
||
r = subprocess.run([WOLF_CLIENT, "-e"],
|
||
capture_output=True, timeout=5)
|
||
ctxt = r.stdout.decode("utf-8", errors="replace").strip()
|
||
feats["ciphers"] = {c for c in ctxt.split(":") if c}
|
||
except Exception:
|
||
pass
|
||
|
||
return feats
|
||
|
||
|
||
def _load_chain(cert_file):
|
||
with open(cert_file) as f:
|
||
chain = X509CertChain()
|
||
chain.parsePemList(f.read())
|
||
return chain
|
||
|
||
|
||
def _load_key(key_file):
|
||
with open(key_file) as f:
|
||
return parsePEMKey(f.read(), private=True)
|
||
|
||
|
||
def _parse_hs_types(data):
|
||
"""Parse handshake message types from raw handshake content."""
|
||
msgs = []
|
||
off = 0
|
||
while off + 4 <= len(data):
|
||
ht = data[off]
|
||
hl = struct.unpack("!I", b"\x00" + bytes(data[off + 1 : off + 4]))[0]
|
||
msgs.append(HS_NAMES.get(ht, f"T{ht}"))
|
||
off += 4 + hl
|
||
return msgs
|
||
|
||
|
||
def _get_free_port():
|
||
"""Get an available TCP port."""
|
||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||
s.bind(("127.0.0.1", 0))
|
||
return s.getsockname()[1]
|
||
|
||
|
||
def _listen_socket():
|
||
"""Bind a listening TCP socket on localhost with the standard test timeout."""
|
||
port = _get_free_port()
|
||
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||
srv.bind(("127.0.0.1", port))
|
||
srv.listen(1)
|
||
srv.settimeout(15)
|
||
return srv, port
|
||
|
||
|
||
def _run_wolf_client(port, version, cipher, extra=()):
|
||
"""Invoke the wolfSSL example client against 127.0.0.1:port.
|
||
|
||
WOLF_CA_CERT is PEM or DER depending on the build (NO_CODING /
|
||
OPENSSL_EXTRA builds don't both support PEM).
|
||
"""
|
||
cmd = [WOLF_CLIENT, "-h", "127.0.0.1", "-p", str(port),
|
||
"-v", version, "-A", WOLF_CA_CERT,
|
||
"-g", *extra]
|
||
if cipher:
|
||
cmd.extend(["-l", cipher])
|
||
return subprocess.run(cmd, capture_output=True, timeout=15)
|
||
|
||
|
||
class _SendRecordTrace:
|
||
"""Context manager that wraps RecordLayer.sendRecord to log every record."""
|
||
|
||
def __init__(self):
|
||
self.log = []
|
||
self._orig = None
|
||
|
||
def __enter__(self):
|
||
self._orig = tlslite.recordlayer.RecordLayer.sendRecord
|
||
log = self.log
|
||
orig = self._orig
|
||
|
||
def wrapper(self_rl, msg):
|
||
data = msg.write()
|
||
ct = msg.contentType
|
||
encrypted = bool(self_rl._writeState
|
||
and self_rl._writeState.encContext)
|
||
hs_msgs = []
|
||
if ct == ContentType.handshake:
|
||
hs_msgs = _parse_hs_types(data)
|
||
log.append((ct, encrypted, len(data), hs_msgs))
|
||
yield from orig(self_rl, msg)
|
||
|
||
tlslite.recordlayer.RecordLayer.sendRecord = wrapper
|
||
return self.log
|
||
|
||
def __exit__(self, *exc):
|
||
tlslite.recordlayer.RecordLayer.sendRecord = self._orig
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# RecordMergingSocket (TLS 1.2 plaintext record merging)
|
||
# ---------------------------------------------------------------------------
|
||
class RecordMergingSocket:
|
||
"""Socket wrapper that rewrites consecutive TLS handshake records into
|
||
a single multi-message record. Only merges plaintext records that
|
||
precede ChangeCipherSpec."""
|
||
|
||
def __init__(self, sock):
|
||
self._sock = sock
|
||
self._pending = bytearray()
|
||
self._ver = 0x0303
|
||
self._after_ccs = False
|
||
self.merged_msgs = [] # [(n_msgs, [names], size)]
|
||
|
||
def _flush(self):
|
||
if not self._pending:
|
||
return
|
||
msgs = _parse_hs_types(self._pending)
|
||
n = len(msgs)
|
||
hdr = struct.pack("!BHH", 22, self._ver, len(self._pending))
|
||
self._sock.sendall(hdr + bytes(self._pending))
|
||
self.merged_msgs.append((n, msgs, len(self._pending)))
|
||
self._pending = bytearray()
|
||
|
||
# Called by BufferedSocket (one record per call, or multiple from flush)
|
||
def _process(self, data):
|
||
data = bytearray(data)
|
||
off = 0
|
||
while off + 5 <= len(data):
|
||
ct = data[off]
|
||
ver = struct.unpack("!H", data[off + 1 : off + 3])[0]
|
||
rlen = struct.unpack("!H", data[off + 3 : off + 5])[0]
|
||
if off + 5 + rlen > len(data):
|
||
break
|
||
payload = data[off + 5 : off + 5 + rlen]
|
||
if not self._after_ccs and ct == 22:
|
||
self._pending.extend(payload)
|
||
self._ver = ver
|
||
else:
|
||
if ct == 20:
|
||
self._after_ccs = True
|
||
self._flush()
|
||
self._sock.sendall(bytes(data[off : off + 5 + rlen]))
|
||
off += 5 + rlen
|
||
|
||
def send(self, data):
|
||
self._process(data)
|
||
return len(data)
|
||
|
||
def sendall(self, data):
|
||
self._process(data)
|
||
|
||
def recv(self, bufsize):
|
||
self._flush()
|
||
return self._sock.recv(bufsize)
|
||
|
||
def __getattr__(self, name):
|
||
return getattr(self._sock, name)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Test runners
|
||
# ---------------------------------------------------------------------------
|
||
def run_tls12_test(cipher_wolf, cert_chain, priv_key, label,
|
||
do_reneg=True):
|
||
"""TLS 1.2 test – one connection optionally exercises two code paths:
|
||
|
||
Phase 1 (plaintext grouping, initial handshake):
|
||
RecordMergingSocket rewrites separate plaintext ServerHello,
|
||
Certificate, ServerKeyExchange and ServerHelloDone records into
|
||
one multi-message TLS record before delivery to wolfSSL.
|
||
|
||
Phase 2 (encrypted grouping, renegotiation on same connection):
|
||
tlslite-ng server is monkey-patched to coalesce SH+Cert+SKE+SHD
|
||
into a single encrypted handshake record inside the renegotiation
|
||
(exercises wolfSSL's encrypted multi-message parsing including
|
||
curSize -= padSz for CBC padding).
|
||
|
||
Phase 2 is skipped when do_reneg=False (e.g. the wolfSSL client was
|
||
built without HAVE_SECURE_RENEGOTIATION).
|
||
"""
|
||
srv, port = _listen_socket()
|
||
|
||
result = {"ok": False, "error": ""}
|
||
msock_ref = [None]
|
||
trace_log = []
|
||
reneg_active = [False]
|
||
verify_data = {'client': None, 'server': None}
|
||
|
||
# --- monkey-patches (used only during this connection) ----------------
|
||
orig_calc_key = tlslite.tlsconnection.calc_key
|
||
|
||
def capturing_calc_key(*args, **kwargs):
|
||
res = orig_calc_key(*args, **kwargs)
|
||
lbl = args[3] if len(args) > 3 else kwargs.get('label', b'')
|
||
if lbl == b"client finished" and verify_data['client'] is None:
|
||
verify_data['client'] = bytearray(res)
|
||
elif lbl == b"server finished" and verify_data['server'] is None:
|
||
verify_data['server'] = bytearray(res)
|
||
return res
|
||
|
||
orig_getExt = HelloMessage.getExtension
|
||
|
||
def patched_getExt(self, ext_type):
|
||
ext = orig_getExt(self, ext_type)
|
||
if (ext_type == ExtensionType.renegotiation_info
|
||
and ext is not None and reneg_active[0]):
|
||
ext._internal_value = bytearray(0)
|
||
return ext
|
||
|
||
orig_rie_create = RenegotiationInfoExtension.create
|
||
|
||
def patched_rie_create(self, data):
|
||
if reneg_active[0] and data == bytearray(0):
|
||
combined = (bytearray(verify_data['client'])
|
||
+ bytearray(verify_data['server']))
|
||
return orig_rie_create(self, combined)
|
||
return orig_rie_create(self, data)
|
||
|
||
# ----------------------------------------------------------------------
|
||
def server():
|
||
try:
|
||
tlslite.tlsconnection.calc_key = capturing_calc_key
|
||
HelloMessage.getExtension = patched_getExt
|
||
RenegotiationInfoExtension.create = patched_rie_create
|
||
|
||
conn, _ = srv.accept()
|
||
conn.settimeout(15)
|
||
msock = RecordMergingSocket(conn)
|
||
msock_ref[0] = msock
|
||
tls = TLSConnection(msock)
|
||
settings = HandshakeSettings()
|
||
settings.minVersion = (3, 3)
|
||
settings.maxVersion = (3, 3)
|
||
|
||
# ---------- Phase 1: initial handshake (plaintext grouping) ----
|
||
tls.handshakeServer(certChain=cert_chain, privateKey=priv_key,
|
||
settings=settings)
|
||
tlslite.tlsconnection.calc_key = orig_calc_key
|
||
|
||
data = tls.recv(4096)
|
||
|
||
if do_reneg:
|
||
# ---------- Phase 2: trigger + run renegotiation ----------
|
||
hr = TLSMessage(ContentType.handshake,
|
||
bytearray([0, 0, 0, 0]))
|
||
for _ in tls._sendMsg(hr, randomizeFirstBlock=False,
|
||
update_hashes=False):
|
||
pass
|
||
|
||
# Bypass tlslite-ng renegotiation guards
|
||
tls.closed = True
|
||
tls.session = None
|
||
reneg_active[0] = True
|
||
|
||
# Coalesce handshake messages into ONE encrypted TLS record
|
||
def coalescing_sendMsgs(self, msgs):
|
||
for msg in msgs:
|
||
self._queue_message(msg)
|
||
yield from self._queue_flush()
|
||
tls._sendMsgs = types.MethodType(coalescing_sendMsgs, tls)
|
||
|
||
with _SendRecordTrace() as log:
|
||
tls.handshakeServer(certChain=cert_chain,
|
||
privateKey=priv_key,
|
||
settings=settings)
|
||
reneg_active[0] = False
|
||
trace_log.extend(log)
|
||
|
||
if data:
|
||
tls.send(data)
|
||
tls.close()
|
||
result["ok"] = True
|
||
except Exception as e:
|
||
import traceback
|
||
result["error"] = traceback.format_exc()
|
||
finally:
|
||
tlslite.tlsconnection.calc_key = orig_calc_key
|
||
HelloMessage.getExtension = orig_getExt
|
||
RenegotiationInfoExtension.create = orig_rie_create
|
||
reneg_active[0] = False
|
||
srv.close()
|
||
|
||
st = threading.Thread(target=server, daemon=True)
|
||
st.start()
|
||
time.sleep(0.1)
|
||
|
||
proc = _run_wolf_client(port, "3", cipher_wolf,
|
||
extra=("-R",) if do_reneg else ())
|
||
st.join(timeout=5)
|
||
|
||
if proc.returncode != 0 or not result["ok"]:
|
||
err = (result["error"]
|
||
or proc.stderr.decode("utf-8", errors="replace")[:400])
|
||
failed(f"{label}: connection failed ({err})")
|
||
return False
|
||
|
||
ok = True
|
||
|
||
# Phase 1 verification: plaintext multi-message record
|
||
msock = msock_ref[0]
|
||
has_pt_grouped = False
|
||
for n, msgs, sz in (msock.merged_msgs if msock else []):
|
||
if n > 1:
|
||
has_pt_grouped = True
|
||
passed(f"{label} [plaintext]: {n} msgs "
|
||
f"[{'+'.join(msgs)}] in one record ({sz} bytes)")
|
||
if not has_pt_grouped:
|
||
failed(f"{label} [plaintext]: no multi-message record detected")
|
||
ok = False
|
||
|
||
# Phase 2 verification: encrypted multi-message record (renego)
|
||
if do_reneg:
|
||
has_enc_grouped = False
|
||
for ct, enc, sz, msgs in trace_log:
|
||
if ct == ContentType.handshake and enc and len(msgs) > 1:
|
||
has_enc_grouped = True
|
||
passed(f"{label} [encrypted]: {len(msgs)} msgs "
|
||
f"[{'+'.join(msgs)}] in one record ({sz} bytes)")
|
||
if not has_enc_grouped:
|
||
failed(f"{label} [encrypted]: no multi-message "
|
||
f"encrypted record")
|
||
ok = False
|
||
|
||
return ok
|
||
|
||
|
||
def run_tls13_test(cipher_wolf, cert_chain, priv_key, label):
|
||
"""TLS 1.3: verify tlslite-ng sends multi-msg encrypted record and
|
||
wolfSSL client processes it."""
|
||
srv, port = _listen_socket()
|
||
|
||
result = {"ok": False, "error": ""}
|
||
|
||
def server():
|
||
try:
|
||
conn, _ = srv.accept()
|
||
conn.settimeout(15)
|
||
tls = TLSConnection(conn)
|
||
settings = HandshakeSettings()
|
||
settings.minVersion = (3, 4)
|
||
settings.maxVersion = (3, 4)
|
||
tls.handshakeServer(certChain=cert_chain, privateKey=priv_key,
|
||
settings=settings)
|
||
data = tls.recv(4096)
|
||
if data:
|
||
tls.send(data)
|
||
tls.close()
|
||
result["ok"] = True
|
||
except Exception as e:
|
||
result["error"] = str(e)
|
||
finally:
|
||
srv.close()
|
||
|
||
with _SendRecordTrace() as log:
|
||
st = threading.Thread(target=server, daemon=True)
|
||
st.start()
|
||
time.sleep(0.1)
|
||
|
||
proc = _run_wolf_client(port, "4", cipher_wolf)
|
||
st.join(timeout=5)
|
||
|
||
if proc.returncode != 0 or not result["ok"]:
|
||
err = result["error"] or proc.stderr.decode("utf-8", errors="replace")[:200]
|
||
failed(f"{label}: handshake failed ({err})")
|
||
return False
|
||
|
||
# Check that at least one encrypted handshake record has multiple messages
|
||
has_multi = False
|
||
for ct, enc, sz, msgs in log:
|
||
if ct == ContentType.handshake and enc and len(msgs) > 1:
|
||
has_multi = True
|
||
passed(f"{label}: {len(msgs)} encrypted msgs "
|
||
f"[{'+'.join(msgs)}] in one record ({sz} bytes)")
|
||
if not has_multi:
|
||
failed(f"{label}: no multi-message encrypted records")
|
||
return False
|
||
return True
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Main
|
||
# ---------------------------------------------------------------------------
|
||
def main():
|
||
if not os.path.isfile(WOLF_CLIENT):
|
||
print(f"ERROR: wolfSSL client not found: {WOLF_CLIENT}")
|
||
print(" Build wolfSSL first (./configure && make)")
|
||
sys.exit(1)
|
||
|
||
# Probe the client to see which features are compiled in so each
|
||
# phase of the test is only run when it can succeed.
|
||
feats = detect_wolf_features()
|
||
global WOLF_CA_CERT
|
||
WOLF_CA_CERT = feats["ca_cert"]
|
||
|
||
print("=" * 60)
|
||
print(" Multi-Message TLS Record Test")
|
||
print("=" * 60)
|
||
print(f" wolfSSL features: TLS1.2={feats['tls12']} "
|
||
f"TLS1.3={feats['tls13']} "
|
||
f"secure_reneg={feats['secure_reneg']} "
|
||
f"rsa={feats['rsa']}")
|
||
|
||
# The test certs are RSA; skip the whole test when the wolfSSL build
|
||
# has no RSA support (the client can't load or verify them).
|
||
if not feats["rsa"]:
|
||
print("\n wolfSSL built without RSA; skipping multi-msg-record "
|
||
"test (RSA test certs cannot be verified).")
|
||
sys.exit(77)
|
||
|
||
# Load certificate / key pairs
|
||
rsa_chain = _load_chain(os.path.join(CERT_DIR, "server-cert.pem"))
|
||
rsa_key = _load_key(os.path.join(CERT_DIR, "server-key.pem"))
|
||
|
||
# ------------------------------------------------------------------
|
||
# TLS 1.2 – plaintext (initial HS) + optional encrypted (renegotiation)
|
||
# multi-message records, same connection per cipher suite.
|
||
# ------------------------------------------------------------------
|
||
tls12_suites = [
|
||
# (wolfSSL cipher name, description)
|
||
(None, "default negotiated"),
|
||
# AEAD (GCM)
|
||
("ECDHE-RSA-AES128-GCM-SHA256", "ECDHE-RSA AES128-GCM"),
|
||
("ECDHE-RSA-AES256-GCM-SHA384", "ECDHE-RSA AES256-GCM"),
|
||
("DHE-RSA-AES128-GCM-SHA256", "DHE-RSA AES128-GCM"),
|
||
("DHE-RSA-AES256-GCM-SHA384", "DHE-RSA AES256-GCM"),
|
||
# CBC + HMAC (exercises padding path)
|
||
("ECDHE-RSA-AES128-SHA256", "ECDHE-RSA AES128-CBC-SHA256"),
|
||
("ECDHE-RSA-AES256-SHA384", "ECDHE-RSA AES256-CBC-SHA384"),
|
||
("DHE-RSA-AES128-SHA256", "DHE-RSA AES128-CBC-SHA256"),
|
||
("DHE-RSA-AES256-SHA256", "DHE-RSA AES256-CBC-SHA256"),
|
||
# AEAD (ChaCha20-Poly1305)
|
||
("ECDHE-RSA-CHACHA20-POLY1305", "ECDHE-RSA CHACHA20-POLY1305"),
|
||
("DHE-RSA-CHACHA20-POLY1305", "DHE-RSA CHACHA20-POLY1305"),
|
||
]
|
||
|
||
if feats["tls12"]:
|
||
if feats["secure_reneg"]:
|
||
print("\n--- TLS 1.2: plaintext + encrypted multi-message "
|
||
"records ---")
|
||
print(" Each connection verifies BOTH code paths:")
|
||
print(" * initial handshake -> plaintext SH+Cert+SKE+SHD")
|
||
print(" * renegotiation -> encrypted SH+Cert+SKE+SHD")
|
||
else:
|
||
print("\n--- TLS 1.2: plaintext multi-message records ---")
|
||
print(" wolfSSL built without HAVE_SECURE_RENEGOTIATION;")
|
||
print(" skipping the encrypted (renegotiation) half.")
|
||
print(" Covers multiple key-exchanges, ciphers and MAC "
|
||
"families.\n")
|
||
|
||
for cipher, desc in tls12_suites:
|
||
if cipher and cipher not in feats["ciphers"]:
|
||
skipped(f"TLS1.2 {desc} - cipher not in wolfSSL build")
|
||
continue
|
||
run_tls12_test(cipher, rsa_chain, rsa_key,
|
||
f"TLS1.2 {desc}",
|
||
do_reneg=feats["secure_reneg"])
|
||
if not feats["secure_reneg"]:
|
||
skipped("TLS1.2 encrypted multi-msg record "
|
||
"(requires HAVE_SECURE_RENEGOTIATION)")
|
||
else:
|
||
skipped(f"TLS 1.2 tests ({len(tls12_suites)} suites) - "
|
||
"wolfSSL built without TLS 1.2")
|
||
|
||
# ------------------------------------------------------------------
|
||
# TLS 1.3 – encrypted multi-message records
|
||
# ------------------------------------------------------------------
|
||
tls13_suites = [
|
||
# (wolfSSL cipher name, description)
|
||
(None, "default negotiated"),
|
||
("TLS13-AES128-GCM-SHA256", "AES-128-GCM"),
|
||
("TLS13-AES256-GCM-SHA384", "AES-256-GCM"),
|
||
("TLS13-CHACHA20-POLY1305-SHA256", "CHACHA20-POLY1305"),
|
||
]
|
||
|
||
if feats["tls13"]:
|
||
print("\n--- TLS 1.3: encrypted multi-message records ---")
|
||
print(" Server sends EE+Cert+CV+Fin in a single encrypted "
|
||
"record;")
|
||
print(" wolfSSL client must decrypt and parse.\n")
|
||
|
||
for cipher, desc in tls13_suites:
|
||
if cipher and cipher not in feats["ciphers"]:
|
||
skipped(f"TLS1.3 {desc} - cipher not in wolfSSL build")
|
||
continue
|
||
run_tls13_test(cipher, rsa_chain, rsa_key,
|
||
f"TLS1.3 {desc}")
|
||
else:
|
||
skipped(f"TLS 1.3 tests ({len(tls13_suites)} suites) - "
|
||
"wolfSSL built without TLS 1.3")
|
||
|
||
# ------------------------------------------------------------------
|
||
# Summary
|
||
# ------------------------------------------------------------------
|
||
print()
|
||
print("=" * 60)
|
||
print(f" Results: {PASS_COUNT} passed, {FAIL_COUNT} failed, "
|
||
f"{SKIP_COUNT} skipped")
|
||
print("=" * 60)
|
||
|
||
# If nothing at all could run, signal SKIP (exit 77) so automake
|
||
# records the test as skipped rather than passed-with-nothing.
|
||
if PASS_COUNT == 0 and FAIL_COUNT == 0:
|
||
sys.exit(77)
|
||
|
||
return FAIL_COUNT == 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(0 if main() else 1)
|