Primer commit del proyecto RSS

This commit is contained in:
jlimolina 2025-05-24 14:37:58 +02:00
commit 27c9515d29
1568 changed files with 252311 additions and 0 deletions

View file

@ -0,0 +1,160 @@
# Copyright (c) 2023, 2024, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0, as
# published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms,
# as designated in a particular file or component or in included license
# documentation. The authors of MySQL hereby grant you an
# additional permission to link the program and your derivative works
# with the separately licensed software that they have either included with
# the program or referenced in the documentation.
#
# Without limiting anything contained in the foregoing, this file,
# which is part of MySQL Connector/Python, is also subject to the
# Universal FOSS Exception, version 1.0, a copy of which can be found at
# http://oss.oracle.com/licenses/universal-foss-exception.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
"""Base Authentication Plugin class."""
import importlib
from abc import ABC, abstractmethod
from functools import lru_cache
from typing import TYPE_CHECKING, Any, Optional, Type
from ..errors import NotSupportedError, ProgrammingError
from ..logger import logger
if TYPE_CHECKING:
from ..network import MySQLSocket
DEFAULT_PLUGINS_PKG = "mysql.connector.plugins"
class MySQLAuthPlugin(ABC):
"""Authorization plugin interface."""
def __init__(
self,
username: str,
password: str,
ssl_enabled: bool = False,
) -> None:
"""Constructor."""
self._username: str = "" if username is None else username
self._password: str = "" if password is None else password
self._ssl_enabled: bool = ssl_enabled
@property
def ssl_enabled(self) -> bool:
"""Signals whether or not SSL is enabled."""
return self._ssl_enabled
@property
@abstractmethod
def requires_ssl(self) -> bool:
"""Signals whether or not SSL is required."""
@property
@abstractmethod
def name(self) -> str:
"""Plugin official name."""
@abstractmethod
def auth_response(self, auth_data: bytes, **kwargs: Any) -> Optional[bytes]:
"""Make the client's authorization response.
Args:
auth_data: Authorization data.
kwargs: Custom configuration to be passed to the auth plugin
when invoked. The parameters defined here will override the ones
defined in the auth plugin itself.
Returns:
packet: Client's authorization response.
"""
def auth_more_response(
self, sock: "MySQLSocket", auth_data: bytes, **kwargs: Any
) -> bytes:
"""Handles server's `auth more data` response.
Args:
sock: Pointer to the socket connection.
auth_data: Authentication method data (from a packet representing
an `auth more data` response).
kwargs: Custom configuration to be passed to the auth plugin
when invoked. The parameters defined here will override the ones
defined in the auth plugin itself.
Returns:
packet: Last server's response after back-and-forth communication.
"""
raise NotImplementedError
@abstractmethod
def auth_switch_response(
self, sock: "MySQLSocket", auth_data: bytes, **kwargs: Any
) -> bytes:
"""Handles server's `auth switch request` response.
Args:
sock: Pointer to the socket connection.
auth_data: Plugin provided data (extracted from a packet
representing an `auth switch request` response).
kwargs: Custom configuration to be passed to the auth plugin
when invoked. The parameters defined here will override the ones
defined in the auth plugin itself.
Returns:
packet: Last server's response after back-and-forth communication.
"""
@lru_cache(maxsize=10, typed=False)
def get_auth_plugin(
plugin_name: str,
auth_plugin_class: Optional[str] = None,
) -> Type[MySQLAuthPlugin]:
"""Return authentication class based on plugin name
This function returns the class for the authentication plugin plugin_name.
The returned class is a subclass of BaseAuthPlugin.
Args:
plugin_name (str): Authentication plugin name.
auth_plugin_class (str): Authentication plugin class name.
Raises:
NotSupportedError: When plugin_name is not supported.
Returns:
Subclass of `MySQLAuthPlugin`.
"""
package = DEFAULT_PLUGINS_PKG
if plugin_name:
try:
logger.info("package: %s", package)
logger.info("plugin_name: %s", plugin_name)
plugin_module = importlib.import_module(f".{plugin_name}", package)
if not auth_plugin_class or not hasattr(plugin_module, auth_plugin_class):
auth_plugin_class = plugin_module.AUTHENTICATION_PLUGIN_CLASS
logger.info("AUTHENTICATION_PLUGIN_CLASS: %s", auth_plugin_class)
return getattr(plugin_module, auth_plugin_class)
except ModuleNotFoundError as err:
logger.warning("Requested Module was not found: %s", err)
except ValueError as err:
raise ProgrammingError(f"Invalid module name: {err}") from err
raise NotSupportedError(f"Authentication plugin '{plugin_name}' is not supported")

View file

@ -0,0 +1,576 @@
# Copyright (c) 2023, 2024, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0, as
# published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms,
# as designated in a particular file or component or in included license
# documentation. The authors of MySQL hereby grant you an
# additional permission to link the program and your derivative works
# with the separately licensed software that they have either included with
# the program or referenced in the documentation.
#
# Without limiting anything contained in the foregoing, this file,
# which is part of MySQL Connector/Python, is also subject to the
# Universal FOSS Exception, version 1.0, a copy of which can be found at
# http://oss.oracle.com/licenses/universal-foss-exception.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
# mypy: disable-error-code="str-bytes-safe,misc"
"""Kerberos Authentication Plugin."""
import getpass
import os
import struct
from abc import abstractmethod
from pathlib import Path
from typing import TYPE_CHECKING, Any, Optional, Tuple
from ..authentication import ERR_STATUS
from ..errors import InterfaceError, ProgrammingError
from ..logger import logger
if TYPE_CHECKING:
from ..network import MySQLSocket
try:
import gssapi
except ImportError:
gssapi = None
if os.name != "nt":
raise ProgrammingError(
"Module gssapi is required for GSSAPI authentication "
"mechanism but was not found. Unable to authenticate "
"with the server"
) from None
try:
import sspi
import sspicon
except ImportError:
sspi = None
sspicon = None
from . import MySQLAuthPlugin
AUTHENTICATION_PLUGIN_CLASS = (
"MySQLSSPIKerberosAuthPlugin" if os.name == "nt" else "MySQLKerberosAuthPlugin"
)
class MySQLBaseKerberosAuthPlugin(MySQLAuthPlugin):
"""Base class for the MySQL Kerberos authentication plugin."""
@property
def name(self) -> str:
"""Plugin official name."""
return "authentication_kerberos_client"
@property
def requires_ssl(self) -> bool:
"""Signals whether or not SSL is required."""
return False
@abstractmethod
def auth_continue(
self, tgt_auth_challenge: Optional[bytes]
) -> Tuple[Optional[bytes], bool]:
"""Continue with the Kerberos TGT service request.
With the TGT authentication service given response generate a TGT
service request. This method must be invoked sequentially (in a loop)
until the security context is completed and an empty response needs to
be send to acknowledge the server.
Args:
tgt_auth_challenge: the challenge for the negotiation.
Returns:
tuple (bytearray TGS service request,
bool True if context is completed otherwise False).
"""
def auth_switch_response(
self, sock: "MySQLSocket", auth_data: bytes, **kwargs: Any
) -> bytes:
"""Handles server's `auth switch request` response.
Args:
sock: Pointer to the socket connection.
auth_data: Plugin provided data (extracted from a packet
representing an `auth switch request` response).
kwargs: Custom configuration to be passed to the auth plugin
when invoked. The parameters defined here will override the ones
defined in the auth plugin itself.
Returns:
packet: Last server's response after back-and-forth
communication.
"""
logger.debug("# auth_data: %s", auth_data)
response = self.auth_response(auth_data, ignore_auth_data=False, **kwargs)
if response is None:
raise InterfaceError("Got a NULL auth response")
logger.debug("# request: %s size: %s", response, len(response))
sock.send(response)
packet = sock.recv()
logger.debug("# server response packet: %s", packet)
if packet != ERR_STATUS:
rcode_size = 5 # Reader size for the response status code
logger.debug("# Continue with GSSAPI authentication")
logger.debug("# Response header: %s", packet[: rcode_size + 1])
logger.debug("# Response size: %s", len(packet))
logger.debug("# Negotiate a service request")
complete = False
tries = 0
while not complete and tries < 5:
logger.debug("%s Attempt %s %s", "-" * 20, tries + 1, "-" * 20)
logger.debug("<< Server response: %s", packet)
logger.debug("# Response code: %s", packet[: rcode_size + 1])
token, complete = self.auth_continue(packet[rcode_size:])
if token:
sock.send(token)
if complete:
break
packet = sock.recv()
logger.debug(">> Response to server: %s", token)
tries += 1
if not complete:
raise InterfaceError(
f"Unable to fulfill server request after {tries} "
f"attempts. Last server response: {packet}"
)
logger.debug(
"Last response from server: %s length: %d",
packet,
len(packet),
)
# Receive OK packet from server.
packet = sock.recv()
logger.debug("<< Ok packet from server: %s", packet)
return bytes(packet)
# pylint: disable=c-extension-no-member,no-member
class MySQLKerberosAuthPlugin(MySQLBaseKerberosAuthPlugin):
"""Implement the MySQL Kerberos authentication plugin."""
context: Optional[gssapi.SecurityContext] = None
@staticmethod
def get_user_from_credentials() -> str:
"""Get user from credentials without realm."""
try:
creds = gssapi.Credentials(usage="initiate")
user = str(creds.name)
if user.find("@") != -1:
user, _ = user.split("@", 1)
return user
except gssapi.raw.misc.GSSError:
return getpass.getuser()
@staticmethod
def get_store() -> dict:
"""Get a credentials store dictionary.
Returns:
dict: Credentials store dictionary with the krb5 ccache name.
Raises:
InterfaceError: If 'KRB5CCNAME' environment variable is empty.
"""
krb5ccname = os.environ.get(
"KRB5CCNAME",
(
f"/tmp/krb5cc_{os.getuid()}"
if os.name == "posix"
else Path("%TEMP%").joinpath("krb5cc")
),
)
if not krb5ccname:
raise InterfaceError(
"The 'KRB5CCNAME' environment variable is set to empty"
)
logger.debug("Using krb5 ccache name: FILE:%s", krb5ccname)
store = {b"ccache": f"FILE:{krb5ccname}".encode("utf-8")}
return store
def _acquire_cred_with_password(self, upn: str) -> gssapi.raw.creds.Creds:
"""Acquire and store credentials through provided password.
Args:
upn (str): User Principal Name.
Returns:
gssapi.raw.creds.Creds: GSSAPI credentials.
"""
logger.debug("Attempt to acquire credentials through provided password")
user = gssapi.Name(upn, gssapi.NameType.user)
password = self._password.encode("utf-8")
try:
acquire_cred_result = gssapi.raw.acquire_cred_with_password(
user, password, usage="initiate"
)
creds = acquire_cred_result.creds
gssapi.raw.store_cred_into(
self.get_store(),
creds=creds,
mech=gssapi.MechType.kerberos,
overwrite=True,
set_default=True,
)
except gssapi.raw.misc.GSSError as err:
raise ProgrammingError(
f"Unable to acquire credentials with the given password: {err}"
) from err
return creds
@staticmethod
def _parse_auth_data(packet: bytes) -> Tuple[str, str]:
"""Parse authentication data.
Get the SPN and REALM from the authentication data packet.
Format:
SPN string length two bytes <B1> <B2> +
SPN string +
UPN realm string length two bytes <B1> <B2> +
UPN realm string
Returns:
tuple: With 'spn' and 'realm'.
"""
spn_len = struct.unpack("<H", packet[:2])[0]
packet = packet[2:]
spn = struct.unpack(f"<{spn_len}s", packet[:spn_len])[0]
packet = packet[spn_len:]
realm_len = struct.unpack("<H", packet[:2])[0]
realm = struct.unpack(f"<{realm_len}s", packet[2:])[0]
return spn.decode(), realm.decode()
def auth_response(
self, auth_data: Optional[bytes] = None, **kwargs: Any
) -> Optional[bytes]:
"""Prepare the first message to the server."""
spn = None
realm = None
if auth_data and not kwargs.get("ignore_auth_data", True):
try:
spn, realm = self._parse_auth_data(auth_data)
except struct.error as err:
raise InterruptedError(f"Invalid authentication data: {err}") from err
if spn is None:
return self._password.encode() + b"\x00"
upn = f"{self._username}@{realm}" if self._username else None
logger.debug("Service Principal: %s", spn)
logger.debug("Realm: %s", realm)
try:
# Attempt to retrieve credentials from cache file
creds: Any = gssapi.Credentials(usage="initiate")
creds_upn = str(creds.name)
logger.debug("Cached credentials found")
logger.debug("Cached credentials UPN: %s", creds_upn)
# Remove the realm from user
if creds_upn.find("@") != -1:
creds_user, creds_realm = creds_upn.split("@", 1)
else:
creds_user = creds_upn
creds_realm = None
upn = f"{self._username}@{realm}" if self._username else creds_upn
# The user from cached credentials matches with the given user?
if self._username and self._username != creds_user:
logger.debug(
"The user from cached credentials doesn't match with the "
"given user"
)
if self._password is not None:
creds = self._acquire_cred_with_password(upn)
if creds_realm and creds_realm != realm and self._password is not None:
creds = self._acquire_cred_with_password(upn)
except gssapi.raw.exceptions.ExpiredCredentialsError as err:
if upn and self._password is not None:
creds = self._acquire_cred_with_password(upn)
else:
raise InterfaceError(f"Credentials has expired: {err}") from err
except gssapi.raw.misc.GSSError as err:
if upn and self._password is not None:
creds = self._acquire_cred_with_password(upn)
else:
raise InterfaceError(
f"Unable to retrieve cached credentials error: {err}"
) from err
flags = (
gssapi.RequirementFlag.mutual_authentication,
gssapi.RequirementFlag.extended_error,
gssapi.RequirementFlag.delegate_to_peer,
)
name = gssapi.Name(spn, name_type=gssapi.NameType.kerberos_principal)
cname = name.canonicalize(gssapi.MechType.kerberos)
self.context = gssapi.SecurityContext(
name=cname, creds=creds, flags=sum(flags), usage="initiate"
)
try:
initial_client_token: Optional[bytes] = self.context.step()
except gssapi.raw.misc.GSSError as err:
raise InterfaceError(f"Unable to initiate security context: {err}") from err
logger.debug("Initial client token: %s", initial_client_token)
return initial_client_token
def auth_continue(
self, tgt_auth_challenge: Optional[bytes]
) -> Tuple[Optional[bytes], bool]:
"""Continue with the Kerberos TGT service request.
With the TGT authentication service given response generate a TGT
service request. This method must be invoked sequentially (in a loop)
until the security context is completed and an empty response needs to
be send to acknowledge the server.
Args:
tgt_auth_challenge: the challenge for the negotiation.
Returns:
tuple (bytearray TGS service request,
bool True if context is completed otherwise False).
"""
logger.debug("tgt_auth challenge: %s", tgt_auth_challenge)
resp: Optional[bytes] = self.context.step(tgt_auth_challenge)
logger.debug("Context step response: %s", resp)
logger.debug("Context completed?: %s", self.context.complete)
return resp, self.context.complete
def auth_accept_close_handshake(self, message: bytes) -> bytes:
"""Accept handshake and generate closing handshake message for server.
This method verifies the server authenticity from the given message
and included signature and generates the closing handshake for the
server.
When this method is invoked the security context is already established
and the client and server can send GSSAPI formated secure messages.
To finish the authentication handshake the server sends a message
with the security layer availability and the maximum buffer size.
Since the connector only uses the GSSAPI authentication mechanism to
authenticate the user with the server, the server will verify clients
message signature and terminate the GSSAPI authentication and send two
messages; an authentication acceptance b'\x01\x00\x00\x08\x01' and a
OK packet (that must be received after sent the returned message from
this method).
Args:
message: a wrapped gssapi message from the server.
Returns:
bytearray (closing handshake message to be send to the server).
"""
if not self.context.complete:
raise ProgrammingError("Security context is not completed")
logger.debug("Server message: %s", message)
logger.debug("GSSAPI flags in use: %s", self.context.actual_flags)
try:
unwraped = self.context.unwrap(message)
logger.debug("Unwraped: %s", unwraped)
except gssapi.raw.exceptions.BadMICError as err:
logger.debug("Unable to unwrap server message: %s", err)
raise InterfaceError(f"Unable to unwrap server message: {err}") from err
logger.debug("Unwrapped server message: %s", unwraped)
# The message contents for the clients closing message:
# - security level 1 byte, must be always 1.
# - conciliated buffer size 3 bytes, without importance as no
# further GSSAPI messages will be sends.
response = bytearray(b"\x01\x00\x00\00")
# Closing handshake must not be encrypted.
logger.debug("Message response: %s", response)
wraped = self.context.wrap(response, encrypt=False)
logger.debug(
"Wrapped message response: %s, length: %d",
wraped[0],
len(wraped[0]),
)
return wraped.message
class MySQLSSPIKerberosAuthPlugin(MySQLBaseKerberosAuthPlugin):
"""Implement the MySQL Kerberos authentication plugin with Windows SSPI"""
context: Any = None
clientauth: Any = None
@staticmethod
def _parse_auth_data(packet: bytes) -> Tuple[str, str]:
"""Parse authentication data.
Get the SPN and REALM from the authentication data packet.
Format:
SPN string length two bytes <B1> <B2> +
SPN string +
UPN realm string length two bytes <B1> <B2> +
UPN realm string
Returns:
tuple: With 'spn' and 'realm'.
"""
spn_len = struct.unpack("<H", packet[:2])[0]
packet = packet[2:]
spn = struct.unpack(f"<{spn_len}s", packet[:spn_len])[0]
packet = packet[spn_len:]
realm_len = struct.unpack("<H", packet[:2])[0]
realm = struct.unpack(f"<{realm_len}s", packet[2:])[0]
return spn.decode(), realm.decode()
def auth_response(
self, auth_data: Optional[bytes] = None, **kwargs: Any
) -> Optional[bytes]:
"""Prepare the first message to the server.
Args:
kwargs:
ignore_auth_data (bool): if True, the provided auth data is ignored.
"""
logger.debug("auth_response for sspi")
spn = None
realm = None
if auth_data and not kwargs.get("ignore_auth_data", True):
try:
spn, realm = self._parse_auth_data(auth_data)
except struct.error as err:
raise InterruptedError(f"Invalid authentication data: {err}") from err
logger.debug("Service Principal: %s", spn)
logger.debug("Realm: %s", realm)
if sspicon is None or sspi is None:
raise ProgrammingError(
'Package "pywin32" (Python for Win32 (pywin32) extensions)'
" is not installed."
)
flags = (sspicon.ISC_REQ_MUTUAL_AUTH, sspicon.ISC_REQ_DELEGATE)
if self._username and self._password:
_auth_info = (self._username, realm, self._password)
else:
_auth_info = None
targetspn = spn
logger.debug("targetspn: %s", targetspn)
logger.debug("_auth_info is None: %s", _auth_info is None)
# The Security Support Provider Interface (SSPI) is an interface
# that allows us to choose from a set of SSPs available in the
# system; the idea of SSPI is to keep interface consistent no
# matter what back end (a.k.a., SSP) we choose.
# When using SSPI we should not use Kerberos directly as SSP,
# as remarked in [2], but we can use it indirectly via another
# SSP named Negotiate that acts as an application layer between
# SSPI and the other SSPs [1].
# Negotiate can select between Kerberos and NTLM on the fly;
# it chooses Kerberos unless it cannot be used by one of the
# systems involved in the authentication or the calling
# application did not provide sufficient information to use
# Kerberos.
# prefix: https://docs.microsoft.com/en-us/windows/win32/secauthn
# [1] prefix/microsoft-negotiate?source=recommendations
# [2] prefix/microsoft-kerberos?source=recommendations
self.clientauth = sspi.ClientAuth(
"Negotiate",
targetspn=targetspn,
auth_info=_auth_info,
scflags=sum(flags),
datarep=sspicon.SECURITY_NETWORK_DREP,
)
try:
data = None
err, out_buf = self.clientauth.authorize(data)
logger.debug("Context step err: %s", err)
logger.debug("Context step out_buf: %s", out_buf)
logger.debug("Context completed?: %s", self.clientauth.authenticated)
initial_client_token = out_buf[0].Buffer
logger.debug("pkg_info: %s", self.clientauth.pkg_info)
except Exception as err:
raise InterfaceError(f"Unable to initiate security context: {err}") from err
logger.debug("Initial client token: %s", initial_client_token)
return initial_client_token
def auth_continue(
self, tgt_auth_challenge: Optional[bytes]
) -> Tuple[Optional[bytes], bool]:
"""Continue with the Kerberos TGT service request.
With the TGT authentication service given response generate a TGT
service request. This method must be invoked sequentially (in a loop)
until the security context is completed and an empty response needs to
be send to acknowledge the server.
Args:
tgt_auth_challenge: the challenge for the negotiation.
Returns:
tuple (bytearray TGS service request,
bool True if context is completed otherwise False).
"""
logger.debug("tgt_auth challenge: %s", tgt_auth_challenge)
err, out_buf = self.clientauth.authorize(tgt_auth_challenge)
logger.debug("Context step err: %s", err)
logger.debug("Context step out_buf: %s", out_buf)
resp = out_buf[0].Buffer
logger.debug("Context step resp: %s", resp)
logger.debug("Context completed?: %s", self.clientauth.authenticated)
return resp, self.clientauth.authenticated

View file

@ -0,0 +1,595 @@
# Copyright (c) 2023, 2024, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0, as
# published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms,
# as designated in a particular file or component or in included license
# documentation. The authors of MySQL hereby grant you an
# additional permission to link the program and your derivative works
# with the separately licensed software that they have either included with
# the program or referenced in the documentation.
#
# Without limiting anything contained in the foregoing, this file,
# which is part of MySQL Connector/Python, is also subject to the
# Universal FOSS Exception, version 1.0, a copy of which can be found at
# http://oss.oracle.com/licenses/universal-foss-exception.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
"""LDAP SASL Authentication Plugin."""
import hmac
from base64 import b64decode, b64encode
from hashlib import sha1, sha256
from typing import TYPE_CHECKING, Any, Callable, List, Optional, Tuple
from uuid import uuid4
from ..authentication import ERR_STATUS
from ..errors import InterfaceError, ProgrammingError
from ..logger import logger
from ..types import StrOrBytes
if TYPE_CHECKING:
from ..network import MySQLSocket
try:
import gssapi
except ImportError:
raise ProgrammingError(
"Module gssapi is required for GSSAPI authentication "
"mechanism but was not found. Unable to authenticate "
"with the server"
) from None
from ..utils import (
normalize_unicode_string as norm_ustr,
validate_normalized_unicode_string as valid_norm,
)
from . import MySQLAuthPlugin
AUTHENTICATION_PLUGIN_CLASS = "MySQLLdapSaslPasswordAuthPlugin"
# pylint: disable=c-extension-no-member,no-member
class MySQLLdapSaslPasswordAuthPlugin(MySQLAuthPlugin):
"""Class implementing the MySQL ldap sasl authentication plugin.
The MySQL's ldap sasl authentication plugin support two authentication
methods SCRAM-SHA-1 and GSSAPI (using Kerberos). This implementation only
support SCRAM-SHA-1 and SCRAM-SHA-256.
SCRAM-SHA-1 amd SCRAM-SHA-256
This method requires 2 messages from client and 2 responses from
server.
The first message from client will be generated by prepare_password(),
after receive the response from the server, it is required that this
response is passed back to auth_continue() which will return the
second message from the client. After send this second message to the
server, the second server respond needs to be passed to auth_finalize()
to finish the authentication process.
"""
sasl_mechanisms: List[str] = ["SCRAM-SHA-1", "SCRAM-SHA-256", "GSSAPI"]
def_digest_mode: Callable = sha1
client_nonce: Optional[str] = None
client_salt: Any = None
server_salt: Optional[str] = None
krb_service_principal: Optional[str] = None
iterations: int = 0
server_auth_var: Optional[str] = None
target_name: Optional[gssapi.Name] = None
ctx: gssapi.SecurityContext = None
servers_first: Optional[str] = None
server_nonce: Optional[str] = None
@staticmethod
def _xor(bytes1: bytes, bytes2: bytes) -> bytes:
return bytes([b1 ^ b2 for b1, b2 in zip(bytes1, bytes2)])
def _hmac(self, password: bytes, salt: bytes) -> bytes:
digest_maker = hmac.new(password, salt, self.def_digest_mode)
return digest_maker.digest()
def _hi(self, password: str, salt: bytes, count: int) -> bytes:
"""Prepares Hi
Hi(password, salt, iterations) where Hi(p,s,i) is defined as
PBKDF2 (HMAC, p, s, i, output length of H).
"""
pw = password.encode()
hi = self._hmac(pw, salt + b"\x00\x00\x00\x01")
aux = hi
for _ in range(count - 1):
aux = self._hmac(pw, aux)
hi = self._xor(hi, aux)
return hi
@staticmethod
def _normalize(string: str) -> str:
norm_str = norm_ustr(string)
broken_rule = valid_norm(norm_str)
if broken_rule is not None:
raise InterfaceError(f"broken_rule: {broken_rule}")
return norm_str
def _first_message(self) -> bytes:
"""This method generates the first message to the server to start the
The client-first message consists of a gs2-header,
the desired username, and a randomly generated client nonce cnonce.
The first message from the server has the form:
b'n,a=<user_name>,n=<user_name>,r=<client_nonce>
Returns client's first message
"""
cfm_fprnat = "n,a={user_name},n={user_name},r={client_nonce}"
self.client_nonce = str(uuid4()).replace("-", "")
cfm: StrOrBytes = cfm_fprnat.format(
user_name=self._normalize(self._username),
client_nonce=self.client_nonce,
)
if isinstance(cfm, str):
cfm = cfm.encode("utf8")
return cfm
def _first_message_krb(self) -> Optional[bytes]:
"""Get a TGT Authentication request and initiates security context.
This method will contact the Kerberos KDC in order of obtain a TGT.
"""
user_name = gssapi.raw.names.import_name(
self._username.encode("utf8"), name_type=gssapi.NameType.user
)
# Use defaults store = {'ccache': 'FILE:/tmp/krb5cc_1000'}#,
# 'keytab':'/etc/some.keytab' }
# Attempt to retrieve credential from default cache file.
try:
cred: Any = gssapi.Credentials()
logger.debug(
"# Stored credentials found, if password was given it will be ignored."
)
try:
# validate credentials has not expired.
cred.lifetime
except gssapi.raw.exceptions.ExpiredCredentialsError as err:
logger.warning(" Credentials has expired: %s", err)
cred.acquire(user_name)
raise InterfaceError(f"Credentials has expired: {err}") from err
except gssapi.raw.misc.GSSError as err:
if not self._password:
raise InterfaceError(
f"Unable to retrieve stored credentials error: {err}"
) from err
try:
logger.debug("# Attempt to retrieve credentials with given password")
acquire_cred_result = gssapi.raw.acquire_cred_with_password(
user_name,
self._password.encode("utf8"),
usage="initiate",
)
cred = acquire_cred_result[0]
except gssapi.raw.misc.GSSError as err2:
raise ProgrammingError(
f"Unable to retrieve credentials with the given password: {err2}"
) from err
flags_l = (
gssapi.RequirementFlag.mutual_authentication,
gssapi.RequirementFlag.extended_error,
gssapi.RequirementFlag.delegate_to_peer,
)
if self.krb_service_principal:
service_principal = self.krb_service_principal
else:
service_principal = "ldap/ldapauth"
logger.debug("# service principal: %s", service_principal)
servk = gssapi.Name(
service_principal, name_type=gssapi.NameType.kerberos_principal
)
self.target_name = servk
self.ctx = gssapi.SecurityContext(
name=servk, creds=cred, flags=sum(flags_l), usage="initiate"
)
try:
# step() returns bytes | None, see documentation,
# so this method could return a NULL payload.
# ref: https://pythongssapi.github.io/<suffix>
# suffix: python-gssapi/latest/gssapi.html#gssapi.sec_contexts.SecurityContext
initial_client_token = self.ctx.step()
except gssapi.raw.misc.GSSError as err:
raise InterfaceError(f"Unable to initiate security context: {err}") from err
logger.debug("# initial client token: %s", initial_client_token)
return initial_client_token
def auth_continue_krb(
self, tgt_auth_challenge: Optional[bytes]
) -> Tuple[Optional[bytes], bool]:
"""Continue with the Kerberos TGT service request.
With the TGT authentication service given response generate a TGT
service request. This method must be invoked sequentially (in a loop)
until the security context is completed and an empty response needs to
be send to acknowledge the server.
Args:
tgt_auth_challenge the challenge for the negotiation.
Returns: tuple (bytearray TGS service request,
bool True if context is completed otherwise False).
"""
logger.debug("tgt_auth challenge: %s", tgt_auth_challenge)
resp = self.ctx.step(tgt_auth_challenge)
logger.debug("# context step response: %s", resp)
logger.debug("# context completed?: %s", self.ctx.complete)
return resp, self.ctx.complete
def auth_accept_close_handshake(self, message: bytes) -> bytes:
"""Accept handshake and generate closing handshake message for server.
This method verifies the server authenticity from the given message
and included signature and generates the closing handshake for the
server.
When this method is invoked the security context is already established
and the client and server can send GSSAPI formated secure messages.
To finish the authentication handshake the server sends a message
with the security layer availability and the maximum buffer size.
Since the connector only uses the GSSAPI authentication mechanism to
authenticate the user with the server, the server will verify clients
message signature and terminate the GSSAPI authentication and send two
messages; an authentication acceptance b'\x01\x00\x00\x08\x01' and a
OK packet (that must be received after sent the returned message from
this method).
Args:
message a wrapped hssapi message from the server.
Returns: bytearray closing handshake message to be send to the server.
"""
if not self.ctx.complete:
raise ProgrammingError("Security context is not completed.")
logger.debug("# servers message: %s", message)
logger.debug("# GSSAPI flags in use: %s", self.ctx.actual_flags)
try:
unwraped = self.ctx.unwrap(message)
logger.debug("# unwraped: %s", unwraped)
except gssapi.raw.exceptions.BadMICError as err:
raise InterfaceError(f"Unable to unwrap server message: {err}") from err
logger.debug("# unwrapped server message: %s", unwraped)
# The message contents for the clients closing message:
# - security level 1 byte, must be always 1.
# - conciliated buffer size 3 bytes, without importance as no
# further GSSAPI messages will be sends.
response = bytearray(b"\x01\x00\x00\00")
# Closing handshake must not be encrypted.
logger.debug("# message response: %s", response)
wraped = self.ctx.wrap(response, encrypt=False)
logger.debug(
"# wrapped message response: %s, length: %d",
wraped[0],
len(wraped[0]),
)
return wraped.message
def auth_response(
self,
auth_data: bytes,
**kwargs: Any,
) -> Optional[bytes]:
"""This method will prepare the fist message to the server.
Returns bytes to send to the server as the first message.
"""
# pylint: disable=attribute-defined-outside-init
self._auth_data = auth_data
auth_mechanism = self._auth_data.decode()
logger.debug("read_method_name_from_server: %s", auth_mechanism)
if auth_mechanism not in self.sasl_mechanisms:
auth_mechanisms = '", "'.join(self.sasl_mechanisms[:-1])
raise InterfaceError(
f'The sasl authentication method "{auth_mechanism}" requested '
f'from the server is not supported. Only "{auth_mechanisms}" '
f'and "{self.sasl_mechanisms[-1]}" are supported'
)
if b"GSSAPI" in self._auth_data:
return self._first_message_krb()
if self._auth_data == b"SCRAM-SHA-256":
self.def_digest_mode = sha256
return self._first_message()
def _second_message(self) -> bytes:
"""This method generates the second message to the server
Second message consist on the concatenation of the client and the
server nonce, and cproof.
c=<n,a=<user_name>>,r=<server_nonce>,p=<client_proof>
where:
<client_proof>: xor(<client_key>, <client_signature>)
<client_key>: hmac(salted_password, b"Client Key")
<client_signature>: hmac(<stored_key>, <auth_msg>)
<stored_key>: h(<client_key>)
<auth_msg>: <client_first_no_header>,<servers_first>,
c=<client_header>,r=<server_nonce>
<client_first_no_header>: n=<username>r=<client_nonce>
"""
if not self._auth_data:
raise InterfaceError("Missing authentication data (seed)")
passw = self._normalize(self._password)
salted_password = self._hi(passw, b64decode(self.server_salt), self.iterations)
logger.debug("salted_password: %s", b64encode(salted_password).decode())
client_key = self._hmac(salted_password, b"Client Key")
logger.debug("client_key: %s", b64encode(client_key).decode())
stored_key = self.def_digest_mode(client_key).digest()
logger.debug("stored_key: %s", b64encode(stored_key).decode())
server_key = self._hmac(salted_password, b"Server Key")
logger.debug("server_key: %s", b64encode(server_key).decode())
client_first_no_header = ",".join(
[
f"n={self._normalize(self._username)}",
f"r={self.client_nonce}",
]
)
logger.debug("client_first_no_header: %s", client_first_no_header)
client_header = b64encode(
f"n,a={self._normalize(self._username)},".encode()
).decode()
auth_msg = ",".join(
[
client_first_no_header,
self.servers_first,
f"c={client_header}",
f"r={self.server_nonce}",
]
)
logger.debug("auth_msg: %s", auth_msg)
client_signature = self._hmac(stored_key, auth_msg.encode())
logger.debug("client_signature: %s", b64encode(client_signature).decode())
client_proof = self._xor(client_key, client_signature)
logger.debug("client_proof: %s", b64encode(client_proof).decode())
self.server_auth_var = b64encode(
self._hmac(server_key, auth_msg.encode())
).decode()
logger.debug("server_auth_var: %s", self.server_auth_var)
msg = ",".join(
[
f"c={client_header}",
f"r={self.server_nonce}",
f"p={b64encode(client_proof).decode()}",
]
)
logger.debug("second_message: %s", msg)
return msg.encode()
def _validate_first_reponse(self, servers_first: bytes) -> None:
"""Validates first message from the server.
Extracts the server's salt and iterations from the servers 1st response.
First message from the server is in the form:
<server_salt>,i=<iterations>
"""
if not servers_first or not isinstance(servers_first, (bytearray, bytes)):
raise InterfaceError(f"Unexpected server message: {repr(servers_first)}")
try:
servers_first_str = servers_first.decode()
self.servers_first = servers_first_str
r_server_nonce, s_salt, i_counter = servers_first_str.split(",")
except ValueError:
raise InterfaceError(
f"Unexpected server message: {servers_first_str}"
) from None
if (
not r_server_nonce.startswith("r=")
or not s_salt.startswith("s=")
or not i_counter.startswith("i=")
):
raise InterfaceError(
f"Incomplete reponse from the server: {servers_first_str}"
)
if self.client_nonce in r_server_nonce:
self.server_nonce = r_server_nonce[2:]
logger.debug("server_nonce: %s", self.server_nonce)
else:
raise InterfaceError(
"Unable to authenticate response: response not well formed "
f"{servers_first_str}"
)
self.server_salt = s_salt[2:]
logger.debug(
"server_salt: %s length: %s",
self.server_salt,
len(self.server_salt),
)
try:
i_counter = i_counter[2:]
logger.debug("iterations: %s", i_counter)
self.iterations = int(i_counter)
except Exception as err:
raise InterfaceError(
f"Unable to authenticate: iterations not found {servers_first_str}"
) from err
def auth_continue(self, servers_first_response: bytes) -> bytes:
"""return the second message from the client.
Returns bytes to send to the server as the second message.
"""
self._validate_first_reponse(servers_first_response)
return self._second_message()
def _validate_second_reponse(self, servers_second: bytearray) -> bool:
"""Validates second message from the server.
The client and the server prove to each other they have the same Auth
variable.
The second message from the server consist of the server's proof:
server_proof = HMAC(<server_key>, <auth_msg>)
where:
<server_key>: hmac(<salted_password>, b"Server Key")
<auth_msg>: <client_first_no_header>,<servers_first>,
c=<client_header>,r=<server_nonce>
Our server_proof must be equal to the Auth variable send on this second
response.
"""
if (
not servers_second
or not isinstance(servers_second, bytearray)
or len(servers_second) <= 2
or not servers_second.startswith(b"v=")
):
raise InterfaceError("The server's proof is not well formated")
server_var = servers_second[2:].decode()
logger.debug("server auth variable: %s", server_var)
return self.server_auth_var == server_var
def auth_finalize(self, servers_second_response: bytearray) -> bool:
"""finalize the authentication process.
Raises InterfaceError if the ervers_second_response is invalid.
Returns True in successful authentication False otherwise.
"""
if not self._validate_second_reponse(servers_second_response):
raise InterfaceError(
"Authentication failed: Unable to proof server identity"
)
return True
@property
def name(self) -> str:
"""Plugin official name."""
return "authentication_ldap_sasl_client"
@property
def requires_ssl(self) -> bool:
"""Signals whether or not SSL is required."""
return False
def auth_switch_response(
self, sock: "MySQLSocket", auth_data: bytes, **kwargs: Any
) -> bytes:
"""Handles server's `auth switch request` response.
Args:
sock: Pointer to the socket connection.
auth_data: Plugin provided data (extracted from a packet
representing an `auth switch request` response).
kwargs: Custom configuration to be passed to the auth plugin
when invoked. The parameters defined here will override the ones
defined in the auth plugin itself.
Returns:
packet: Last server's response after back-and-forth
communication.
"""
logger.debug("# auth_data: %s", auth_data)
self.krb_service_principal = kwargs.get("krb_service_principal")
response = self.auth_response(auth_data, **kwargs)
if response is None:
raise InterfaceError("Got a NULL auth response")
logger.debug("# request: %s size: %s", response, len(response))
sock.send(response)
packet = sock.recv()
logger.debug("# server response packet: %s", packet)
if len(packet) >= 6 and packet[5] == 114 and packet[6] == 61: # 'r' and '='
# Continue with sasl authentication
dec_response = packet[5:]
cresponse = self.auth_continue(dec_response)
sock.send(cresponse)
packet = sock.recv()
if packet[5] == 118 and packet[6] == 61: # 'v' and '='
if self.auth_finalize(packet[5:]):
# receive packed OK
packet = sock.recv()
elif auth_data == b"GSSAPI" and packet[4] != ERR_STATUS:
rcode_size = 5 # header size for the response status code.
logger.debug("# Continue with sasl GSSAPI authentication")
logger.debug("# response header: %s", packet[: rcode_size + 1])
logger.debug("# response size: %s", len(packet))
logger.debug("# Negotiate a service request")
complete = False
tries = 0 # To avoid a infinite loop attempt no more than feedback messages
while not complete and tries < 5:
logger.debug("%s Attempt %s %s", "-" * 20, tries + 1, "-" * 20)
logger.debug("<< server response: %s", packet)
logger.debug("# response code: %s", packet[: rcode_size + 1])
step, complete = self.auth_continue_krb(packet[rcode_size:])
logger.debug(" >> response to server: %s", step)
sock.send(step or b"")
packet = sock.recv()
tries += 1
if not complete:
raise InterfaceError(
f"Unable to fulfill server request after {tries} "
f"attempts. Last server response: {packet}"
)
logger.debug(
" last GSSAPI response from server: %s length: %d",
packet,
len(packet),
)
last_step = self.auth_accept_close_handshake(packet[rcode_size:])
logger.debug(
" >> last response to server: %s length: %d",
last_step,
len(last_step),
)
sock.send(last_step)
# Receive final handshake from server
packet = sock.recv()
logger.debug("<< final handshake from server: %s", packet)
# receive OK packet from server.
packet = sock.recv()
logger.debug("<< ok packet from server: %s", packet)
return bytes(packet)
# pylint: enable=c-extension-no-member,no-member

View file

@ -0,0 +1,234 @@
# Copyright (c) 2023, 2024, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0, as
# published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms,
# as designated in a particular file or component or in included license
# documentation. The authors of MySQL hereby grant you an
# additional permission to link the program and your derivative works
# with the separately licensed software that they have either included with
# the program or referenced in the documentation.
#
# Without limiting anything contained in the foregoing, this file,
# which is part of MySQL Connector/Python, is also subject to the
# Universal FOSS Exception, version 1.0, a copy of which can be found at
# http://oss.oracle.com/licenses/universal-foss-exception.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
# mypy: disable-error-code="arg-type,union-attr,call-arg"
"""OCI Authentication Plugin."""
import json
import os
from base64 import b64encode
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Optional
from .. import errors
from ..logger import logger
if TYPE_CHECKING:
from ..network import MySQLSocket
try:
from cryptography.exceptions import UnsupportedAlgorithm
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.asymmetric.types import PRIVATE_KEY_TYPES
except ImportError:
raise errors.ProgrammingError("Package 'cryptography' is not installed") from None
try:
from oci import config, exceptions
except ImportError:
raise errors.ProgrammingError(
"Package 'oci' (Oracle Cloud Infrastructure Python SDK) is not installed"
) from None
from . import MySQLAuthPlugin
AUTHENTICATION_PLUGIN_CLASS = "MySQLOCIAuthPlugin"
OCI_SECURITY_TOKEN_MAX_SIZE = 10 * 1024 # In bytes
OCI_SECURITY_TOKEN_TOO_LARGE = "Ephemeral security token is too large (10KB max)"
OCI_SECURITY_TOKEN_FILE_NOT_AVAILABLE = (
"Ephemeral security token file ('security_token_file') could not be read"
)
OCI_PROFILE_MISSING_PROPERTIES = (
"OCI configuration file does not contain a 'fingerprint' or 'key_file' entry"
)
class MySQLOCIAuthPlugin(MySQLAuthPlugin):
"""Implement the MySQL OCI IAM authentication plugin."""
context: Any = None
oci_config_profile: str = "DEFAULT"
oci_config_file: str = config.DEFAULT_LOCATION
@staticmethod
def _prepare_auth_response(signature: bytes, oci_config: Dict[str, Any]) -> str:
"""Prepare client's authentication response
Prepares client's authentication response in JSON format
Args:
signature (bytes): server's nonce to be signed by client.
oci_config (dict): OCI configuration object.
Returns:
str: JSON string with the following format:
{"fingerprint": str, "signature": str, "token": base64.base64.base64}
Raises:
ProgrammingError: If the ephemeral security token file can't be open or the
token is too large.
"""
signature_64 = b64encode(signature)
auth_response = {
"fingerprint": oci_config["fingerprint"],
"signature": signature_64.decode(),
}
# The security token, if it exists, should be a JWT (JSON Web Token), consisted
# of a base64-encoded header, body, and signature, separated by '.',
# e.g. "Base64.Base64.Base64", stored in a file at the path specified by the
# security_token_file configuration property
if oci_config.get("security_token_file"):
try:
security_token_file = Path(oci_config["security_token_file"])
# Check if token exceeds the maximum size
if security_token_file.stat().st_size > OCI_SECURITY_TOKEN_MAX_SIZE:
raise errors.ProgrammingError(OCI_SECURITY_TOKEN_TOO_LARGE)
auth_response["token"] = security_token_file.read_text(encoding="utf-8")
except (OSError, UnicodeError) as err:
raise errors.ProgrammingError(
OCI_SECURITY_TOKEN_FILE_NOT_AVAILABLE
) from err
return json.dumps(auth_response, separators=(",", ":"))
@staticmethod
def _get_private_key(key_path: str) -> PRIVATE_KEY_TYPES:
"""Get the private_key form the given location"""
try:
with open(os.path.expanduser(key_path), "rb") as key_file:
private_key = serialization.load_pem_private_key(
key_file.read(),
password=None,
)
except (TypeError, OSError, ValueError, UnsupportedAlgorithm) as err:
raise errors.ProgrammingError(
"An error occurred while reading the API_KEY from "
f'"{key_path}": {err}'
)
return private_key
def _get_valid_oci_config(self) -> Dict[str, Any]:
"""Get a valid OCI config from the given configuration file path"""
error_list = []
req_keys = {
"fingerprint": (lambda x: len(x) > 32),
"key_file": (lambda x: os.path.exists(os.path.expanduser(x))),
}
oci_config: Dict[str, Any] = {}
try:
# key_file is validated by oci.config if present
oci_config = config.from_file(
self.oci_config_file or config.DEFAULT_LOCATION,
self.oci_config_profile or "DEFAULT",
)
for req_key, req_value in req_keys.items():
try:
# Verify parameter in req_key is present and valid
if oci_config[req_key] and not req_value(oci_config[req_key]):
error_list.append(f'Parameter "{req_key}" is invalid')
except KeyError:
error_list.append(f"Does not contain parameter {req_key}")
except (
exceptions.ConfigFileNotFound,
exceptions.InvalidConfig,
exceptions.InvalidKeyFilePath,
exceptions.InvalidPrivateKey,
exceptions.ProfileNotFound,
) as err:
error_list.append(str(err))
# Raise errors if any
if error_list:
raise errors.ProgrammingError(
f"Invalid oci-config-file: {self.oci_config_file}. "
f"Errors found: {error_list}"
)
return oci_config
@property
def name(self) -> str:
"""Plugin official name."""
return "authentication_oci_client"
@property
def requires_ssl(self) -> bool:
"""Signals whether or not SSL is required."""
return False
def auth_response(self, auth_data: bytes, **kwargs: Any) -> Optional[bytes]:
"""Prepare authentication string for the server."""
logger.debug("server nonce: %s, len %d", auth_data, len(auth_data))
oci_config = self._get_valid_oci_config()
private_key = self._get_private_key(oci_config["key_file"])
signature = private_key.sign(auth_data, padding.PKCS1v15(), hashes.SHA256())
auth_response = self._prepare_auth_response(signature, oci_config)
logger.debug("authentication response: %s", auth_response)
return auth_response.encode()
def auth_switch_response(
self, sock: "MySQLSocket", auth_data: bytes, **kwargs: Any
) -> bytes:
"""Handles server's `auth switch request` response.
Args:
sock: Pointer to the socket connection.
auth_data: Plugin provided data (extracted from a packet
representing an `auth switch request` response).
kwargs: Custom configuration to be passed to the auth plugin
when invoked. The parameters defined here will override the ones
defined in the auth plugin itself.
Returns:
packet: Last server's response after back-and-forth
communication.
"""
self.oci_config_file = kwargs.get("oci_config_file", "DEFAULT")
self.oci_config_profile = kwargs.get(
"oci_config_profile", config.DEFAULT_LOCATION
)
logger.debug("# oci configuration file path: %s", self.oci_config_file)
response = self.auth_response(auth_data, **kwargs)
if response is None:
raise errors.InterfaceError("Got a NULL auth response")
logger.debug("# request: %s size: %s", response, len(response))
sock.send(response)
packet = sock.recv()
logger.debug("# server response packet: %s", packet)
return bytes(packet)

View file

@ -0,0 +1,173 @@
# Copyright (c) 2024, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0, as
# published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms,
# as designated in a particular file or component or in included license
# documentation. The authors of MySQL hereby grant you an
# additional permission to link the program and your derivative works
# with the separately licensed software that they have either included with
# the program or referenced in the documentation.
#
# Without limiting anything contained in the foregoing, this file,
# which is part of MySQL Connector/Python, is also subject to the
# Universal FOSS Exception, version 1.0, a copy of which can be found at
# http://oss.oracle.com/licenses/universal-foss-exception.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
"""OpenID Authentication Plugin."""
import re
from pathlib import Path
from typing import Any, List
from mysql.connector import utils
from .. import errors
from ..logger import logger
from ..network import MySQLSocket
from . import MySQLAuthPlugin
AUTHENTICATION_PLUGIN_CLASS = "MySQLOpenIDConnectAuthPlugin"
OPENID_TOKEN_MAX_SIZE = 10 * 1024 # In bytes
class MySQLOpenIDConnectAuthPlugin(MySQLAuthPlugin):
"""Class implementing the MySQL OpenID Connect Authentication Plugin."""
_openid_capability_flag: bytes = utils.int1store(1)
@property
def name(self) -> str:
"""Plugin official name."""
return "authentication_openid_connect_client"
@property
def requires_ssl(self) -> bool:
"""Signals whether or not SSL is required."""
return True
@staticmethod
def _validate_openid_token(token: str) -> bool:
"""Helper method used to validate OpenID Connect token
The Token is represented as a JSON Web Token (JWT) consists of a
base64-encoded header, body, and signature, separated by '.' e.g.,
"Base64url.Base64url.Base64url". The First part of the token contains
the header, the second part contains payload and the third part contains
signature. These token parts should be Base64 URLSafe i.e., Token cannot
contain characters other than a-z, A-Z, 0-9 and special characters '-', '_'.
Args:
token (str): Base64url-encoded OpenID connect token fetched from
the file path passed via `openid_token_file` connection
argument.
Returns:
bool: Signal indicating whether the token is valid or not.
"""
header_payload_sig: List[str] = token.split(".")
if len(header_payload_sig) != 3:
# invalid structure
return False
urlsafe_pattern = re.compile("^[a-zA-Z0-9-_]*$")
return all(
(
len(token_part) and urlsafe_pattern.search(token_part) is not None
for token_part in header_payload_sig
)
)
def auth_response(self, auth_data: bytes, **kwargs: Any) -> bytes:
"""Prepares authentication string for the server.
Args:
auth_data: Authorization data.
kwargs: Custom configuration to be passed to the auth plugin
when invoked.
Returns:
packet: Client's authorization response.
The OpenID Connect authorization response follows the pattern :-
int<1> capability flag
string<lenenc> id token
Raises:
InterfaceError: If the connection is insecure or the OpenID Token is too large,
invalid or non-existent.
ProgrammingError: If the OpenID Token file could not be read.
"""
try:
# Check if the connection is secure
if self.requires_ssl and not self._ssl_enabled:
raise errors.InterfaceError(f"{self.name} requires SSL")
# Validate the file
token_file_path: str = kwargs.get("openid_token_file", None)
openid_token_file: Path = Path(token_file_path)
# Check if token exceeds the maximum size
if openid_token_file.stat().st_size > OPENID_TOKEN_MAX_SIZE:
raise errors.InterfaceError(
"The OpenID Connect token file size is too large (> 10KB)"
)
openid_token: str = openid_token_file.read_text(encoding="utf-8")
openid_token = openid_token.strip()
# Validate the JWT Token
if not self._validate_openid_token(openid_token):
raise errors.InterfaceError("The OpenID Connect Token is invalid")
# build the auth_response packet
auth_response: List[bytes] = [
self._openid_capability_flag,
utils.lc_int(len(openid_token)),
openid_token.encode(),
]
return b"".join(auth_response)
except (SyntaxError, TypeError, OSError, UnicodeError) as err:
raise errors.ProgrammingError(
"The OpenID Connect Token File (openid_token_file) could not be read"
) from err
def auth_switch_response(
self, sock: MySQLSocket, auth_data: bytes, **kwargs: Any
) -> bytes:
"""Handles server's `auth switch request` response.
Args:
sock: Pointer to the socket connection.
auth_data: Plugin provided data (extracted from a packet
representing an `auth switch request` response).
kwargs: Custom configuration to be passed to the auth plugin
when invoked. The parameters defined here will override the ones
defined in the auth plugin itself.
Returns:
packet: Last server's response after back-and-forth
communication.
Raises:
InterfaceError: If a NULL auth response is received from auth_response method.
"""
response = self.auth_response(auth_data, **kwargs)
if response is None:
raise errors.InterfaceError("Got a NULL auth response")
logger.debug("# request: %s size: %s", response, len(response))
sock.send(response)
packet = sock.recv()
logger.debug("# server response packet: %s", packet)
return bytes(packet)

View file

@ -0,0 +1,290 @@
# Copyright (c) 2023, 2025, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0, as
# published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms,
# as designated in a particular file or component or in included license
# documentation. The authors of MySQL hereby grant you an
# additional permission to link the program and your derivative works
# with the separately licensed software that they have either included with
# the program or referenced in the documentation.
#
# Without limiting anything contained in the foregoing, this file,
# which is part of MySQL Connector/Python, is also subject to the
# Universal FOSS Exception, version 1.0, a copy of which can be found at
# http://oss.oracle.com/licenses/universal-foss-exception.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
"""WebAuthn Authentication Plugin."""
from typing import TYPE_CHECKING, Any, Callable, Optional
from .. import errors, utils
from ..logger import logger
from . import MySQLAuthPlugin
if TYPE_CHECKING:
from ..network import MySQLSocket
try:
from fido2.cbor import dump_bytes as cbor_dump_bytes
from fido2.client import Fido2Client, UserInteraction
from fido2.hid import CtapHidDevice
from fido2.webauthn import PublicKeyCredentialRequestOptions
except ImportError as import_err:
raise errors.ProgrammingError(
"Module fido2 is required for WebAuthn authentication mechanism but was "
"not found. Unable to authenticate with the server"
) from import_err
try:
from fido2.pcsc import CtapPcscDevice
CTAP_PCSC_DEVICE_AVAILABLE = True
except ModuleNotFoundError:
CTAP_PCSC_DEVICE_AVAILABLE = False
AUTHENTICATION_PLUGIN_CLASS = "MySQLWebAuthnAuthPlugin"
class ClientInteraction(UserInteraction):
"""Provides user interaction to the Client."""
def __init__(self, callback: Optional[Callable] = None):
self.callback = callback
self.msg = (
"Please insert FIDO device and perform gesture action for authentication "
"to complete."
)
def prompt_up(self) -> None:
"""Prompt message for the user interaction with the FIDO device."""
if self.callback is None:
print(self.msg)
else:
self.callback(self.msg)
class MySQLWebAuthnAuthPlugin(MySQLAuthPlugin):
"""Class implementing the MySQL WebAuthn authentication plugin."""
client: Optional[Fido2Client] = None
callback: Optional[Callable] = None
options: dict = {"rpId": None, "challenge": None, "allowCredentials": []}
@property
def name(self) -> str:
"""Plugin official name."""
return "authentication_webauthn_client"
@property
def requires_ssl(self) -> bool:
"""Signals whether or not SSL is required."""
return False
def get_assertion_response(
self, credential_id: Optional[bytearray] = None
) -> bytes:
"""Get assertion from authenticator and return the response.
Args:
credential_id (Optional[bytearray]): The credential ID.
Returns:
bytearray: The response packet with the data from the assertion.
"""
if self.client is None:
raise errors.InterfaceError("No WebAuthn client found")
if credential_id is not None:
# If credential_id is not None, it's because the FIDO device does not
# support resident keys and the credential_id was requested from the server
self.options["allowCredentials"] = [
{
"id": credential_id,
"type": "public-key",
}
]
# Get assertion from authenticator
assertion = self.client.get_assertion(
PublicKeyCredentialRequestOptions.from_dict(self.options)
)
number_of_assertions = len(assertion.get_assertions())
client_data_json = b""
# Build response packet
#
# Format:
# int<1> 0x02 (2) status tag
# int<lenenc> number of assertions length encoded number of assertions
# string authenticator data variable length raw binary string
# string signed challenge variable length raw binary string
# ...
# ...
# string authenticator data variable length raw binary string
# string signed challenge variable length raw binary string
# string ClientDataJSON variable length raw binary string
packet = utils.lc_int(2)
packet += utils.lc_int(number_of_assertions)
# Add authenticator data and signed challenge for each assertion
for i in range(number_of_assertions):
assertion_response = assertion.get_response(i)
# string<lenenc> authenticator_data
authenticator_data = cbor_dump_bytes(assertion_response.authenticator_data)
# string<lenenc> signed_challenge
signature = assertion_response.signature
packet += utils.lc_int(len(authenticator_data))
packet += authenticator_data
packet += utils.lc_int(len(signature))
packet += signature
# string<lenenc> client_data_json
client_data_json = assertion_response.client_data
packet += utils.lc_int(len(client_data_json))
packet += client_data_json
logger.debug("WebAuthn - payload response packet: %s", packet)
return packet
def auth_response(self, auth_data: bytes, **kwargs: Any) -> Optional[bytes]:
"""Find authenticator device and check if supports resident keys.
It also creates a Fido2Client using the relying party ID from the server.
Raises:
InterfaceError: When the FIDO device is not found.
Returns:
bytes: 2 if the authenticator supports resident keys else 1.
"""
try:
packets, capability = utils.read_int(auth_data, 1)
challenge, rp_id = utils.read_lc_string_list(packets)
self.options["challenge"] = challenge
self.options["rpId"] = rp_id.decode()
logger.debug("WebAuthn - capability: %d", capability)
logger.debug("WebAuthn - challenge: %s", self.options["challenge"])
logger.debug("WebAuthn - relying party id: %s", self.options["rpId"])
except ValueError as err:
raise errors.InterfaceError(
"Unable to parse MySQL WebAuthn authentication data"
) from err
# Locate a device
device = next(CtapHidDevice.list_devices(), None)
if device is not None:
logger.debug("WebAuthn - Use USB HID channel")
elif CTAP_PCSC_DEVICE_AVAILABLE:
device = next(CtapPcscDevice.list_devices(), None) # type: ignore[arg-type]
if device is None:
raise errors.InterfaceError("No FIDO device found")
# Set up a FIDO 2 client using the origin relying party id
self.client = Fido2Client(
device,
f"https://{self.options['rpId']}",
user_interaction=ClientInteraction(self.callback),
)
if not self.client.info.options.get("rk"):
logger.debug("WebAuthn - Authenticator doesn't support resident keys")
return b"1"
logger.debug("WebAuthn - Authenticator with support for resident key found")
return b"2"
def auth_more_response(
self, sock: "MySQLSocket", auth_data: bytes, **kwargs: Any
) -> bytes:
"""Handles server's `auth more data` response.
Args:
sock: Pointer to the socket connection.
auth_data: Authentication method data (from a packet representing
an `auth more data` response).
kwargs: Custom configuration to be passed to the auth plugin
when invoked. The parameters defined here will override the ones
defined in the auth plugin itself.
Returns:
packet: Last server's response after back-and-forth
communication.
"""
_, credential_id = utils.read_lc_string(auth_data)
response = self.get_assertion_response(credential_id) # type: ignore[arg-type]
logger.debug("WebAuthn - request: %s size: %s", response, len(response))
sock.send(response)
pkt = bytes(sock.recv())
logger.debug("WebAuthn - server response packet: %s", pkt)
return pkt
def auth_switch_response(
self, sock: "MySQLSocket", auth_data: bytes, **kwargs: Any
) -> bytes:
"""Handles server's `auth switch request` response.
Args:
sock: Pointer to the socket connection.
auth_data: Plugin provided data (extracted from a packet
representing an `auth switch request` response).
kwargs: Custom configuration to be passed to the auth plugin
when invoked. The parameters defined here will override the ones
defined in the auth plugin itself.
Returns:
packet: Last server's response after back-and-forth
communication.
"""
webauth_callback = kwargs.get("webauthn_callback") or kwargs.get(
"fido_callback"
)
self.callback = (
utils.import_object(webauth_callback)
if isinstance(webauth_callback, str)
else webauth_callback
)
response = self.auth_response(auth_data)
credential_id = None
if response == b"1":
# Authenticator doesn't support resident keys, request credential_id
logger.debug("WebAuthn - request credential_id")
sock.send(utils.lc_int(int(response)))
# return a packet representing an `auth more data` response
return bytes(sock.recv())
response = self.get_assertion_response(credential_id)
logger.debug("WebAuthn - request: %s size: %s", response, len(response))
sock.send(response)
pkt = bytes(sock.recv())
logger.debug("WebAuthn - server response packet: %s", pkt)
return pkt

View file

@ -0,0 +1,159 @@
# Copyright (c) 2023, 2024, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0, as
# published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms,
# as designated in a particular file or component or in included license
# documentation. The authors of MySQL hereby grant you an
# additional permission to link the program and your derivative works
# with the separately licensed software that they have either included with
# the program or referenced in the documentation.
#
# Without limiting anything contained in the foregoing, this file,
# which is part of MySQL Connector/Python, is also subject to the
# Universal FOSS Exception, version 1.0, a copy of which can be found at
# http://oss.oracle.com/licenses/universal-foss-exception.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
"""Caching SHA2 Password Authentication Plugin."""
import struct
from hashlib import sha256
from typing import TYPE_CHECKING, Any, Optional
from ..errors import InterfaceError
from ..logger import logger
from . import MySQLAuthPlugin
if TYPE_CHECKING:
from ..network import MySQLSocket
AUTHENTICATION_PLUGIN_CLASS = "MySQLCachingSHA2PasswordAuthPlugin"
class MySQLCachingSHA2PasswordAuthPlugin(MySQLAuthPlugin):
"""Class implementing the MySQL caching_sha2_password authentication plugin
Note that encrypting using RSA is not supported since the Python
Standard Library does not provide this OpenSSL functionality.
"""
perform_full_authentication: int = 4
def _scramble(self, auth_data: bytes) -> bytes:
"""Return a scramble of the password using a Nonce sent by the
server.
The scramble is of the form:
XOR(SHA2(password), SHA2(SHA2(SHA2(password)), Nonce))
"""
if not auth_data:
raise InterfaceError("Missing authentication data (seed)")
if not self._password:
return b""
hash1 = sha256(self._password.encode()).digest()
hash2 = sha256()
hash2.update(sha256(hash1).digest())
hash2.update(auth_data)
hash2_digest = hash2.digest()
xored = [h1 ^ h2 for (h1, h2) in zip(hash1, hash2_digest)]
hash3 = struct.pack("32B", *xored)
return hash3
@property
def name(self) -> str:
"""Plugin official name."""
return "caching_sha2_password"
@property
def requires_ssl(self) -> bool:
"""Signals whether or not SSL is required."""
return False
def auth_response(self, auth_data: bytes, **kwargs: Any) -> Optional[bytes]:
"""Make the client's authorization response.
Args:
auth_data: Authorization data.
kwargs: Custom configuration to be passed to the auth plugin
when invoked. The parameters defined here will override the ones
defined in the auth plugin itself.
Returns:
packet: Client's authorization response.
"""
if not auth_data:
return None
if len(auth_data) > 1:
return self._scramble(auth_data)
if auth_data[0] == self.perform_full_authentication:
# return password as clear text.
return self._password.encode() + b"\x00"
return None
def auth_more_response(
self, sock: "MySQLSocket", auth_data: bytes, **kwargs: Any
) -> bytes:
"""Handles server's `auth more data` response.
Args:
sock: Pointer to the socket connection.
auth_data: Authentication method data (from a packet representing
an `auth more data` response).
kwargs: Custom configuration to be passed to the auth plugin
when invoked. The parameters defined here will override the ones
defined in the auth plugin itself.
Returns:
packet: Last server's response after back-and-forth
communication.
"""
response = self.auth_response(auth_data, **kwargs)
if response:
sock.send(response)
return bytes(sock.recv())
def auth_switch_response(
self, sock: "MySQLSocket", auth_data: bytes, **kwargs: Any
) -> bytes:
"""Handles server's `auth switch request` response.
Args:
sock: Pointer to the socket connection.
auth_data: Plugin provided data (extracted from a packet
representing an `auth switch request` response).
kwargs: Custom configuration to be passed to the auth plugin
when invoked. The parameters defined here will override the ones
defined in the auth plugin itself.
Returns:
packet: Last server's response after back-and-forth
communication.
"""
response = self.auth_response(auth_data, **kwargs)
if response is None:
raise InterfaceError("Got a NULL auth response")
logger.debug("# request: %s size: %s", response, len(response))
sock.send(response)
pkt = bytes(sock.recv())
logger.debug("# server response packet: %s", pkt)
return pkt

View file

@ -0,0 +1,104 @@
# Copyright (c) 2023, 2024, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0, as
# published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms,
# as designated in a particular file or component or in included license
# documentation. The authors of MySQL hereby grant you an
# additional permission to link the program and your derivative works
# with the separately licensed software that they have either included with
# the program or referenced in the documentation.
#
# Without limiting anything contained in the foregoing, this file,
# which is part of MySQL Connector/Python, is also subject to the
# Universal FOSS Exception, version 1.0, a copy of which can be found at
# http://oss.oracle.com/licenses/universal-foss-exception.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
"""Clear Password Authentication Plugin."""
from typing import TYPE_CHECKING, Any, Optional
from .. import errors
from ..logger import logger
from . import MySQLAuthPlugin
if TYPE_CHECKING:
from ..network import MySQLSocket
AUTHENTICATION_PLUGIN_CLASS = "MySQLClearPasswordAuthPlugin"
class MySQLClearPasswordAuthPlugin(MySQLAuthPlugin):
"""Class implementing the MySQL Clear Password authentication plugin"""
def _prepare_password(self) -> bytes:
"""Prepare and return password as as clear text.
Returns:
bytes: Prepared password.
"""
return self._password.encode() + b"\x00"
@property
def name(self) -> str:
"""Plugin official name."""
return "mysql_clear_password"
@property
def requires_ssl(self) -> bool:
"""Signals whether or not SSL is required."""
return True
def auth_response(self, auth_data: bytes, **kwargs: Any) -> Optional[bytes]:
"""Return the prepared password to send to MySQL.
Raises:
InterfaceError: When SSL is required by not enabled.
Returns:
str: The prepared password.
"""
if self.requires_ssl and not self._ssl_enabled:
raise errors.InterfaceError(f"{self.name} requires SSL")
return self._prepare_password()
def auth_switch_response(
self, sock: "MySQLSocket", auth_data: bytes, **kwargs: Any
) -> bytes:
"""Handles server's `auth switch request` response.
Args:
sock: Pointer to the socket connection.
auth_data: Plugin provided data (extracted from a packet
representing an `auth switch request` response).
kwargs: Custom configuration to be passed to the auth plugin
when invoked. The parameters defined here will override the ones
defined in the auth plugin itself.
Returns:
packet: Last server's response after back-and-forth
communication.
"""
response = self.auth_response(auth_data, **kwargs)
if response is None:
raise errors.InterfaceError("Got a NULL auth response")
logger.debug("# request: %s size: %s", response, len(response))
sock.send(response)
pkt = bytes(sock.recv())
logger.debug("# server response packet: %s", pkt)
return pkt

View file

@ -0,0 +1,120 @@
# Copyright (c) 2023, 2024, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0, as
# published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms,
# as designated in a particular file or component or in included license
# documentation. The authors of MySQL hereby grant you an
# additional permission to link the program and your derivative works
# with the separately licensed software that they have either included with
# the program or referenced in the documentation.
#
# Without limiting anything contained in the foregoing, this file,
# which is part of MySQL Connector/Python, is also subject to the
# Universal FOSS Exception, version 1.0, a copy of which can be found at
# http://oss.oracle.com/licenses/universal-foss-exception.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
"""Native Password Authentication Plugin."""
import struct
from hashlib import sha1
from typing import TYPE_CHECKING, Any, Optional
from ..errors import InterfaceError
from ..logger import logger
from . import MySQLAuthPlugin
if TYPE_CHECKING:
from ..network import MySQLSocket
AUTHENTICATION_PLUGIN_CLASS = "MySQLNativePasswordAuthPlugin"
class MySQLNativePasswordAuthPlugin(MySQLAuthPlugin):
"""Class implementing the MySQL Native Password authentication plugin"""
def _prepare_password(self, auth_data: bytes) -> bytes:
"""Prepares and returns password as native MySQL 4.1+ password"""
if not auth_data:
raise InterfaceError("Missing authentication data (seed)")
if not self._password:
return b""
hash4 = None
try:
hash1 = sha1(self._password.encode()).digest()
hash2 = sha1(hash1).digest()
hash3 = sha1(auth_data + hash2).digest()
xored = [h1 ^ h3 for (h1, h3) in zip(hash1, hash3)]
hash4 = struct.pack("20B", *xored)
except (struct.error, TypeError) as err:
raise InterfaceError(f"Failed scrambling password; {err}") from err
return hash4
@property
def name(self) -> str:
"""Plugin official name."""
return "mysql_native_password"
@property
def requires_ssl(self) -> bool:
"""Signals whether or not SSL is required."""
return False
def auth_response(self, auth_data: bytes, **kwargs: Any) -> Optional[bytes]:
"""Make the client's authorization response.
Args:
auth_data: Authorization data.
kwargs: Custom configuration to be passed to the auth plugin
when invoked. The parameters defined here will override the ones
defined in the auth plugin itself.
Returns:
packet: Client's authorization response.
"""
return self._prepare_password(auth_data)
def auth_switch_response(
self, sock: "MySQLSocket", auth_data: bytes, **kwargs: Any
) -> bytes:
"""Handles server's `auth switch request` response.
Args:
sock: Pointer to the socket connection.
auth_data: Plugin provided data (extracted from a packet
representing an `auth switch request` response).
kwargs: Custom configuration to be passed to the auth plugin
when invoked. The parameters defined here will override the ones
defined in the auth plugin itself.
Returns:
packet: Last server's response after back-and-forth
communication.
"""
response = self.auth_response(auth_data, **kwargs)
if response is None:
raise InterfaceError("Got a NULL auth response")
logger.debug("# request: %s size: %s", response, len(response))
sock.send(response)
pkt = bytes(sock.recv())
logger.debug("# server response packet: %s", pkt)
return pkt

View file

@ -0,0 +1,108 @@
# Copyright (c) 2023, 2024, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0, as
# published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms,
# as designated in a particular file or component or in included license
# documentation. The authors of MySQL hereby grant you an
# additional permission to link the program and your derivative works
# with the separately licensed software that they have either included with
# the program or referenced in the documentation.
#
# Without limiting anything contained in the foregoing, this file,
# which is part of MySQL Connector/Python, is also subject to the
# Universal FOSS Exception, version 1.0, a copy of which can be found at
# http://oss.oracle.com/licenses/universal-foss-exception.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
"""SHA256 Password Authentication Plugin."""
from typing import TYPE_CHECKING, Any, Optional
from .. import errors
from ..logger import logger
from . import MySQLAuthPlugin
if TYPE_CHECKING:
from ..network import MySQLSocket
AUTHENTICATION_PLUGIN_CLASS = "MySQLSHA256PasswordAuthPlugin"
class MySQLSHA256PasswordAuthPlugin(MySQLAuthPlugin):
"""Class implementing the MySQL SHA256 authentication plugin
Note that encrypting using RSA is not supported since the Python
Standard Library does not provide this OpenSSL functionality.
"""
def _prepare_password(self) -> bytes:
"""Prepare and return password as as clear text.
Returns:
password (bytes): Prepared password.
"""
return self._password.encode() + b"\x00"
@property
def name(self) -> str:
"""Plugin official name."""
return "sha256_password"
@property
def requires_ssl(self) -> bool:
"""Signals whether or not SSL is required."""
return True
def auth_response(self, auth_data: bytes, **kwargs: Any) -> Optional[bytes]:
"""Return the prepared password to send to MySQL.
Raises:
InterfaceError: When SSL is required by not enabled.
Returns:
str: The prepared password.
"""
if self.requires_ssl and not self.ssl_enabled:
raise errors.InterfaceError(f"{self.name} requires SSL")
return self._prepare_password()
def auth_switch_response(
self, sock: "MySQLSocket", auth_data: bytes, **kwargs: Any
) -> bytes:
"""Handles server's `auth switch request` response.
Args:
sock: Pointer to the socket connection.
auth_data: Plugin provided data (extracted from a packet
representing an `auth switch request` response).
kwargs: Custom configuration to be passed to the auth plugin
when invoked. The parameters defined here will override the ones
defined in the auth plugin itself.
Returns:
packet: Last server's response after back-and-forth
communication.
"""
response = self.auth_response(auth_data, **kwargs)
if response is None:
raise errors.InterfaceError("Got a NULL auth response")
logger.debug("# request: %s size: %s", response, len(response))
sock.send(response)
pkt = bytes(sock.recv())
logger.debug("# server response packet: %s", pkt)
return pkt