Skip to content

Commit

Permalink
Convert symmetric ciphers to Rust
Browse files Browse the repository at this point in the history
  • Loading branch information
alex committed Jan 31, 2024
1 parent 722a639 commit b3a9f9c
Show file tree
Hide file tree
Showing 16 changed files with 899 additions and 685 deletions.
166 changes: 4 additions & 162 deletions src/cryptography/hazmat/backends/openssl/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,12 @@

import collections
import contextlib
import itertools
import typing

from cryptography import utils, x509
from cryptography.exceptions import UnsupportedAlgorithm
from cryptography.hazmat.backends.openssl.ciphers import _CipherContext
from cryptography.hazmat.bindings._rust import openssl as rust_openssl
from cryptography.hazmat.bindings.openssl import binding
from cryptography.hazmat.decrepit.ciphers.algorithms import (
ARC4,
CAST5,
IDEA,
SEED,
Blowfish,
TripleDES,
)
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives._asymmetric import AsymmetricPadding
from cryptography.hazmat.primitives.asymmetric import ec
Expand All @@ -40,21 +30,9 @@
)
from cryptography.hazmat.primitives.ciphers.algorithms import (
AES,
AES128,
AES256,
SM4,
Camellia,
ChaCha20,
)
from cryptography.hazmat.primitives.ciphers.modes import (
CBC,
CFB,
CFB8,
CTR,
ECB,
GCM,
OFB,
XTS,
Mode,
)
from cryptography.hazmat.primitives.serialization.pkcs12 import (
Expand All @@ -70,7 +48,7 @@

# Not actually supported, just used as a marker for some serialization tests.
class _RC2:
pass
key_size = 128


class Backend:
Expand Down Expand Up @@ -117,25 +95,15 @@ def __init__(self) -> None:
self._lib = self._binding.lib
self._fips_enabled = rust_openssl.is_fips_enabled()

self._cipher_registry: dict[
tuple[type[CipherAlgorithm], type[Mode]],
typing.Callable,
] = {}
self._register_default_ciphers()

def __repr__(self) -> str:
return "<OpenSSLBackend(version: {}, FIPS: {}, Legacy: {})>".format(
self.openssl_version_text(),
self._fips_enabled,
self._binding._legacy_provider_loaded,
)

def openssl_assert(
self,
ok: bool,
errors: list[rust_openssl.OpenSSLError] | None = None,
) -> None:
return binding._openssl_assert(ok, errors=errors)
def openssl_assert(self, ok: bool) -> None:
return binding._openssl_assert(ok)

def _enable_fips(self) -> None:
# This function enables FIPS mode for OpenSSL 3.0.0 on installs that
Expand Down Expand Up @@ -210,103 +178,7 @@ def cipher_supported(self, cipher: CipherAlgorithm, mode: Mode) -> bool:
if not isinstance(cipher, self._fips_ciphers):
return False

try:
adapter = self._cipher_registry[type(cipher), type(mode)]
except KeyError:
return False
evp_cipher = adapter(self, cipher, mode)
return self._ffi.NULL != evp_cipher

def register_cipher_adapter(self, cipher_cls, mode_cls, adapter) -> None:
if (cipher_cls, mode_cls) in self._cipher_registry:
raise ValueError(
f"Duplicate registration for: {cipher_cls} {mode_cls}."
)
self._cipher_registry[cipher_cls, mode_cls] = adapter

def _register_default_ciphers(self) -> None:
for cipher_cls in [AES, AES128, AES256]:
for mode_cls in [CBC, CTR, ECB, OFB, CFB, CFB8, GCM]:
self.register_cipher_adapter(
cipher_cls,
mode_cls,
GetCipherByName(
"{cipher.name}-{cipher.key_size}-{mode.name}"
),
)
for mode_cls in [CBC, CTR, ECB, OFB, CFB]:
self.register_cipher_adapter(
Camellia,
mode_cls,
GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}"),
)
for mode_cls in [CBC, CFB, CFB8, OFB]:
self.register_cipher_adapter(
TripleDES, mode_cls, GetCipherByName("des-ede3-{mode.name}")
)
self.register_cipher_adapter(
TripleDES, ECB, GetCipherByName("des-ede3")
)
# ChaCha20 uses the Long Name "chacha20" in OpenSSL, but in LibreSSL
# it uses "chacha"
self.register_cipher_adapter(
ChaCha20,
type(None),
GetCipherByName(
"chacha" if self._lib.CRYPTOGRAPHY_IS_LIBRESSL else "chacha20"
),
)
self.register_cipher_adapter(AES, XTS, _get_xts_cipher)
for mode_cls in [ECB, CBC, OFB, CFB, CTR, GCM]:
self.register_cipher_adapter(
SM4, mode_cls, GetCipherByName("sm4-{mode.name}")
)
# Don't register legacy ciphers if they're unavailable. Hypothetically
# this wouldn't be necessary because we test availability by seeing if
# we get an EVP_CIPHER * in the _CipherContext __init__, but OpenSSL 3
# will return a valid pointer even though the cipher is unavailable.
if (
self._binding._legacy_provider_loaded
or not self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER
):
for mode_cls in [CBC, CFB, OFB, ECB]:
self.register_cipher_adapter(
Blowfish,
mode_cls,
GetCipherByName("bf-{mode.name}"),
)
for mode_cls in [CBC, CFB, OFB, ECB]:
self.register_cipher_adapter(
SEED,
mode_cls,
GetCipherByName("seed-{mode.name}"),
)
for cipher_cls, mode_cls in itertools.product(
[CAST5, IDEA],
[CBC, OFB, CFB, ECB],
):
self.register_cipher_adapter(
cipher_cls,
mode_cls,
GetCipherByName("{cipher.name}-{mode.name}"),
)
self.register_cipher_adapter(
ARC4, type(None), GetCipherByName("rc4")
)
# We don't actually support RC2, this is just used by some tests.
self.register_cipher_adapter(
_RC2, type(None), GetCipherByName("rc2")
)

def create_symmetric_encryption_ctx(
self, cipher: CipherAlgorithm, mode: Mode
) -> _CipherContext:
return _CipherContext(self, cipher, mode, _CipherContext._ENCRYPT)

def create_symmetric_decryption_ctx(
self, cipher: CipherAlgorithm, mode: Mode
) -> _CipherContext:
return _CipherContext(self, cipher, mode, _CipherContext._DECRYPT)
return rust_openssl.ciphers.cipher_supported(cipher, mode)

def pbkdf2_hmac_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
return self.hmac_supported(algorithm)
Expand Down Expand Up @@ -841,34 +713,4 @@ def pkcs7_supported(self) -> bool:
return not self._lib.CRYPTOGRAPHY_IS_BORINGSSL


class GetCipherByName:
def __init__(self, fmt: str):
self._fmt = fmt

def __call__(self, backend: Backend, cipher: CipherAlgorithm, mode: Mode):
cipher_name = self._fmt.format(cipher=cipher, mode=mode).lower()
evp_cipher = backend._lib.EVP_get_cipherbyname(
cipher_name.encode("ascii")
)

# try EVP_CIPHER_fetch if present
if (
evp_cipher == backend._ffi.NULL
and backend._lib.Cryptography_HAS_300_EVP_CIPHER
):
evp_cipher = backend._lib.EVP_CIPHER_fetch(
backend._ffi.NULL,
cipher_name.encode("ascii"),
backend._ffi.NULL,
)

backend._consume_errors()
return evp_cipher


def _get_xts_cipher(backend: Backend, cipher: AES, mode):
cipher_name = f"aes-{cipher.key_size // 2}-xts"
return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii"))


backend = Backend()
Loading

0 comments on commit b3a9f9c

Please sign in to comment.