Module mudproto.mccp
¶
MUD Client Compression protocol.
Class MCCPMixIn(TelnetInterface)
¶
An MCCP mix in class for the Telnet protocol.
Source code in mudproto/mccp.py
class MCCPMixIn(TelnetInterface):
"""An MCCP mix in class for the Telnet protocol."""
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""
Defines the constructor.
Args:
*args: Positional arguments to be passed to the parent constructor.
**kwargs: Key-word only arguments to be passed to the parent constructor.
"""
super().__init__(*args, **kwargs)
self.subnegotiation_map[MCCP1] = lambda *args: None
self.subnegotiation_map[MCCP2] = lambda *args: None
self._compression_enabled: bool = False
self._mccp_version: int | None = None
self._compressed_input_buffer: bytearray = bytearray()
self._decompressor: Any = None
def disable_mccp(self) -> None:
"""Disables compression."""
self._mccp_version = None
self._compression_enabled = False
self._decompressor = None
def on_data_received(self, data: bytes) -> None: # NOQA: D102
input_buffer: bytearray = self._compressed_input_buffer
input_buffer.extend(data)
while input_buffer:
if self._compression_enabled:
# Data is compressed.
super().on_data_received(self._decompressor.decompress(input_buffer))
input_buffer.clear()
if self._decompressor.unused_data:
# Uncompressed data following the compressed data.
# Likely due to the server terminating compression.
logger.debug(
"received uncompressed data while compression enabled. Disabling compression."
)
input_buffer.extend(self._decompressor.unused_data)
state = self.get_option_state(MCCP1 if self._mccp_version == 1 else MCCP2)
state.him.enabled = False
state.him.negotiating = False
self.disable_mccp()
continue # Process the remaining uncompressed data.
return # input_buffer is empty, no need to loop again.
# Data is uncompressed.
iac_index: int = input_buffer.find(IAC)
if self._mccp_version is not None and iac_index != -1:
# MCCP was negotiated on, and an IAC byte was found.
if iac_index > 0:
super().on_data_received(bytes(input_buffer[:iac_index]))
del input_buffer[:iac_index]
if input_buffer == IAC:
# Partial IAC sequence.
return
if input_buffer.startswith(IAC_SB):
se_index: int = input_buffer.find(SE)
if se_index == -1:
# Partial subnegotiation sequence.
return
if input_buffer.startswith(MCCP_ENABLED_RESPONSES):
# The server enabled compression. Subsequent data will be compressed.
self._compression_enabled = True
self._decompressor = zlib.decompressobj(zlib.MAX_WBITS)
logger.debug("Peer notifies us that subsequent data will be compressed.")
else:
# We don't care about other subnegotiations, pass it on.
super().on_data_received(bytes(input_buffer[: se_index + 1]))
del input_buffer[: se_index + 1]
else:
# We don't care about other IAC sequences, pass it on.
super().on_data_received(bytes(input_buffer[:2]))
del input_buffer[:2]
else:
# MCCP was not negotiated on, or no IAC was found.
super().on_data_received(bytes(input_buffer))
input_buffer.clear()
def on_enable_remote(self, option: bytes) -> bool: # NOQA: D102
if option in {MCCP1, MCCP2}:
if self._mccp_version is None:
self._mccp_version = 1 if option == MCCP1 else 2
logger.debug(f"MCCP{self._mccp_version} negotiation enabled.")
return True
return False
return bool(super().on_enable_remote(option)) # pragma: no cover
def on_disable_remote(self, option: bytes) -> None: # NOQA: D102
if option in {MCCP1, MCCP2}:
logger.debug(
f"MCCP{self._mccp_version if self._mccp_version is not None else ''} negotiation disabled."
)
self.disable_mccp()
return
super().on_disable_remote(option) # type: ignore[safe-super] # pragma: no cover
Attribute is_client: bool
inherited
property
readonly
¶
True if acting as a client, False otherwise.
Attribute is_server: bool
inherited
property
readonly
¶
True if acting as a server, False otherwise.
Method __init__(self, *args, **kwargs)
special
¶
Defines the constructor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*args |
Any |
Positional arguments to be passed to the parent constructor. |
() |
**kwargs |
Any |
Key-word only arguments to be passed to the parent constructor. |
{} |
Source code in mudproto/mccp.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""
Defines the constructor.
Args:
*args: Positional arguments to be passed to the parent constructor.
**kwargs: Key-word only arguments to be passed to the parent constructor.
"""
super().__init__(*args, **kwargs)
self.subnegotiation_map[MCCP1] = lambda *args: None
self.subnegotiation_map[MCCP2] = lambda *args: None
self._compression_enabled: bool = False
self._mccp_version: int | None = None
self._compressed_input_buffer: bytearray = bytearray()
self._decompressor: Any = None
Method disable_mccp(self)
¶
Disables compression.
Source code in mudproto/mccp.py
def disable_mccp(self) -> None:
"""Disables compression."""
self._mccp_version = None
self._compression_enabled = False
self._decompressor = None
Method do(self, option)
inherited
¶
Requests that the peer enable an option.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The option to enable. |
required |
Source code in mudproto/mccp.py
@abstractmethod
def do(self, option: bytes) -> None:
"""
Requests that the peer enable an option.
Args:
option: The option to enable.
"""
Method dont(self, option)
inherited
¶
Requests that the peer disable an option.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The option to disable. |
required |
Source code in mudproto/mccp.py
@abstractmethod
def dont(self, option: bytes) -> None:
"""
Requests that the peer disable an option.
Args:
option: The option to disable.
"""
Method get_option_state(self, option)
inherited
¶
Gets the state of a Telnet option.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The option to get state. |
required |
Returns:
| Type | Description |
|---|---|
_OptionState |
An object containing the option state. |
Source code in mudproto/mccp.py
@abstractmethod
def get_option_state(self, option: bytes) -> _OptionState:
"""
Gets the state of a Telnet option.
Args:
option: The option to get state.
Returns:
An object containing the option state.
"""
Method on_command(self, command, option)
inherited
¶
Called when a 1 or 2 byte command is received.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
command |
bytes |
The first byte in a 1 or 2 byte negotiation sequence. |
required |
option |
bytes | None |
The second byte in a 2 byte negotiation sequence or None. |
required |
Source code in mudproto/mccp.py
@abstractmethod
def on_command(self, command: bytes, option: bytes | None) -> None:
"""
Called when a 1 or 2 byte command is received.
Args:
command: The first byte in a 1 or 2 byte negotiation sequence.
option: The second byte in a 2 byte negotiation sequence or None.
"""
Method on_connection_lost(self)
inherited
¶
Called by disconnect when a connection to peer has been lost.
Source code in mudproto/mccp.py
@abstractmethod
def on_connection_lost(self) -> None:
"""Called by `disconnect` when a connection to peer has been lost."""
Method on_connection_made(self)
inherited
¶
Called by connect when a connection to peer has been established.
Source code in mudproto/mccp.py
@abstractmethod
def on_connection_made(self) -> None:
"""Called by `connect` when a connection to peer has been established."""
Method on_data_received(self, data)
¶
Called by parse when data is received.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data |
bytes |
The received data. |
required |
Source code in mudproto/mccp.py
def on_data_received(self, data: bytes) -> None: # NOQA: D102
input_buffer: bytearray = self._compressed_input_buffer
input_buffer.extend(data)
while input_buffer:
if self._compression_enabled:
# Data is compressed.
super().on_data_received(self._decompressor.decompress(input_buffer))
input_buffer.clear()
if self._decompressor.unused_data:
# Uncompressed data following the compressed data.
# Likely due to the server terminating compression.
logger.debug(
"received uncompressed data while compression enabled. Disabling compression."
)
input_buffer.extend(self._decompressor.unused_data)
state = self.get_option_state(MCCP1 if self._mccp_version == 1 else MCCP2)
state.him.enabled = False
state.him.negotiating = False
self.disable_mccp()
continue # Process the remaining uncompressed data.
return # input_buffer is empty, no need to loop again.
# Data is uncompressed.
iac_index: int = input_buffer.find(IAC)
if self._mccp_version is not None and iac_index != -1:
# MCCP was negotiated on, and an IAC byte was found.
if iac_index > 0:
super().on_data_received(bytes(input_buffer[:iac_index]))
del input_buffer[:iac_index]
if input_buffer == IAC:
# Partial IAC sequence.
return
if input_buffer.startswith(IAC_SB):
se_index: int = input_buffer.find(SE)
if se_index == -1:
# Partial subnegotiation sequence.
return
if input_buffer.startswith(MCCP_ENABLED_RESPONSES):
# The server enabled compression. Subsequent data will be compressed.
self._compression_enabled = True
self._decompressor = zlib.decompressobj(zlib.MAX_WBITS)
logger.debug("Peer notifies us that subsequent data will be compressed.")
else:
# We don't care about other subnegotiations, pass it on.
super().on_data_received(bytes(input_buffer[: se_index + 1]))
del input_buffer[: se_index + 1]
else:
# We don't care about other IAC sequences, pass it on.
super().on_data_received(bytes(input_buffer[:2]))
del input_buffer[:2]
else:
# MCCP was not negotiated on, or no IAC was found.
super().on_data_received(bytes(input_buffer))
input_buffer.clear()
Method on_disable_local(self, option)
inherited
¶
Disables a locally managed option.
This method is called before we disable a locally enabled option, in order to perform any necessary cleanup.
Note
If on_enable_local is overridden, this method must be overridden as well.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The option being disabled. |
required |
Source code in mudproto/mccp.py
@abstractmethod
def on_disable_local(self, option: bytes) -> None:
"""
Disables a locally managed option.
This method is called before we disable a
locally enabled option, in order to perform any necessary cleanup.
Note:
If on_enable_local is overridden, this method must be overridden as well.
Args:
option: The option being disabled.
"""
raise NotImplementedError(f"Don't know how to disable local Telnet option {option!r}")
Method on_disable_remote(self, option)
¶
Disables a remotely managed option.
This method is called when peer disables a remotely enabled option, in order to perform any necessary cleanup on our end.
Note
If on_enable_remote is overridden, this method must be overridden as well.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The option being disabled. |
required |
Source code in mudproto/mccp.py
def on_disable_remote(self, option: bytes) -> None: # NOQA: D102
if option in {MCCP1, MCCP2}:
logger.debug(
f"MCCP{self._mccp_version if self._mccp_version is not None else ''} negotiation disabled."
)
self.disable_mccp()
return
super().on_disable_remote(option) # type: ignore[safe-super] # pragma: no cover
Method on_enable_local(self, option)
inherited
¶
Called to accept or reject the request for us to manage the option.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The option that peer requests us to handle. |
required |
Returns:
| Type | Description |
|---|---|
bool |
True if we will handle the option, False otherwise. |
Source code in mudproto/mccp.py
@abstractmethod
def on_enable_local(self, option: bytes) -> bool:
"""
Called to accept or reject the request for us to manage the option.
Args:
option: The option that peer requests us to handle.
Returns:
True if we will handle the option, False otherwise.
"""
return False # Reject all options by default.
Method on_enable_remote(self, option)
¶
Called to accept or reject the request for peer to manage the option.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The option that peer wants to handle. |
required |
Returns:
| Type | Description |
|---|---|
bool |
True if we will allow peer to handle the option, False otherwise. |
Source code in mudproto/mccp.py
def on_enable_remote(self, option: bytes) -> bool: # NOQA: D102
if option in {MCCP1, MCCP2}:
if self._mccp_version is None:
self._mccp_version = 1 if option == MCCP1 else 2
logger.debug(f"MCCP{self._mccp_version} negotiation enabled.")
return True
return False
return bool(super().on_enable_remote(option)) # pragma: no cover
Method on_option_enabled(self, option)
inherited
¶
Called after an option has been fully enabled.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The option that has been enabled. |
required |
Source code in mudproto/mccp.py
@abstractmethod
def on_option_enabled(self, option: bytes) -> None:
"""
Called after an option has been fully enabled.
Args:
option: The option that has been enabled.
"""
Method on_subnegotiation(self, option, data)
inherited
¶
Called when a subnegotiation is received.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The subnegotiation option. |
required |
data |
bytes |
The payload. |
required |
Source code in mudproto/mccp.py
@abstractmethod
def on_subnegotiation(self, option: bytes, data: bytes) -> None:
"""
Called when a subnegotiation is received.
Args:
option: The subnegotiation option.
data: The payload.
"""
Method on_unhandled_command(self, command, option)
inherited
¶
Called for commands for which no handler is installed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
command |
bytes |
The first byte in a 1 or 2 byte negotiation sequence. |
required |
option |
bytes | None |
The second byte in a 2 byte negotiation sequence or None. |
required |
Source code in mudproto/mccp.py
@abstractmethod
def on_unhandled_command(self, command: bytes, option: bytes | None) -> None:
"""
Called for commands for which no handler is installed.
Args:
command: The first byte in a 1 or 2 byte negotiation sequence.
option: The second byte in a 2 byte negotiation sequence or None.
"""
Method on_unhandled_subnegotiation(self, option, data)
inherited
¶
Called for subnegotiations for which no handler is installed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The subnegotiation option. |
required |
data |
bytes |
The payload. |
required |
Source code in mudproto/mccp.py
@abstractmethod
def on_unhandled_subnegotiation(self, option: bytes, data: bytes) -> None:
"""
Called for subnegotiations for which no handler is installed.
Args:
option: The subnegotiation option.
data: The payload.
"""
Method request_negotiation(self, option, data)
inherited
¶
Sends a subnegotiation message to the peer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The subnegotiation option. |
required |
data |
bytes |
The payload. |
required |
Source code in mudproto/mccp.py
@abstractmethod
def request_negotiation(self, option: bytes, data: bytes) -> None:
"""
Sends a subnegotiation message to the peer.
Args:
option: The subnegotiation option.
data: The payload.
"""
Method will(self, option)
inherited
¶
Indicates our willingness to enable an option.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The option to accept. |
required |
Source code in mudproto/mccp.py
@abstractmethod
def will(self, option: bytes) -> None:
"""
Indicates our willingness to enable an option.
Args:
option: The option to accept.
"""
Method wont(self, option)
inherited
¶
Indicates we are not willing to enable an option.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The option to reject. |
required |
Source code in mudproto/mccp.py
@abstractmethod
def wont(self, option: bytes) -> None:
"""
Indicates we are not willing to enable an option.
Args:
option: The option to reject.
"""
Method write(self, data)
inherited
¶
Writes data to peer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data |
bytes |
The bytes to be written. |
required |
Source code in mudproto/mccp.py
def write(self, data: bytes) -> None:
"""
Writes data to peer.
Args:
data: The bytes to be written.
"""
self._writer(data)