Skip to content

Module mudproto.mccp

MUD Client Compression protocol.

Class MCCPMixIn(TelnetInterface)

An MCCP mix in class for the Telnet protocol.

Source code in mudproto/mccp.py
class MCCPMixIn(TelnetInterface):
	"""An MCCP mix in class for the Telnet protocol."""

	def __init__(self, *args: Any, **kwargs: Any) -> None:
		"""
		Defines the constructor.

		Args:
			*args: Positional arguments to be passed to the parent constructor.
			**kwargs: Key-word only arguments to be passed to the parent constructor.
		"""
		super().__init__(*args, **kwargs)
		self.subnegotiation_map[MCCP1] = lambda *args: None
		self.subnegotiation_map[MCCP2] = lambda *args: None
		self._compression_enabled: bool = False
		self._mccp_version: int | None = None
		self._compressed_input_buffer: bytearray = bytearray()
		self._decompressor: Any = None

	def disable_mccp(self) -> None:
		"""Disables compression."""
		self._mccp_version = None
		self._compression_enabled = False
		self._decompressor = None

	def on_data_received(self, data: bytes) -> None:  # NOQA: D102
		input_buffer: bytearray = self._compressed_input_buffer
		input_buffer.extend(data)
		while input_buffer:
			if self._compression_enabled:
				# Data is compressed.
				super().on_data_received(self._decompressor.decompress(input_buffer))
				input_buffer.clear()
				if self._decompressor.unused_data:
					# Uncompressed data following the compressed data.
					# Likely due to the server terminating compression.
					logger.debug(
						"received uncompressed data while compression enabled. Disabling compression."
					)
					input_buffer.extend(self._decompressor.unused_data)
					state = self.get_option_state(MCCP1 if self._mccp_version == 1 else MCCP2)
					state.him.enabled = False
					state.him.negotiating = False
					self.disable_mccp()
					continue  # Process the remaining uncompressed data.
				return  # input_buffer is empty, no need to loop again.
			# Data is uncompressed.
			iac_index: int = input_buffer.find(IAC)
			if self._mccp_version is not None and iac_index != -1:
				# MCCP was negotiated on, and an IAC byte was found.
				if iac_index > 0:
					super().on_data_received(bytes(input_buffer[:iac_index]))
					del input_buffer[:iac_index]
				if input_buffer == IAC:
					# Partial IAC sequence.
					return
				if input_buffer.startswith(IAC_SB):
					se_index: int = input_buffer.find(SE)
					if se_index == -1:
						# Partial subnegotiation sequence.
						return
					if input_buffer.startswith(MCCP_ENABLED_RESPONSES):
						# The server enabled compression. Subsequent data will be compressed.
						self._compression_enabled = True
						self._decompressor = zlib.decompressobj(zlib.MAX_WBITS)
						logger.debug("Peer notifies us that subsequent data will be compressed.")
					else:
						# We don't care about other subnegotiations, pass it on.
						super().on_data_received(bytes(input_buffer[: se_index + 1]))
					del input_buffer[: se_index + 1]
				else:
					# We don't care about other IAC sequences, pass it on.
					super().on_data_received(bytes(input_buffer[:2]))
					del input_buffer[:2]
			else:
				# MCCP was not negotiated on, or no IAC was found.
				super().on_data_received(bytes(input_buffer))
				input_buffer.clear()

	def on_enable_remote(self, option: bytes) -> bool:  # NOQA: D102
		if option in {MCCP1, MCCP2}:
			if self._mccp_version is None:
				self._mccp_version = 1 if option == MCCP1 else 2
				logger.debug(f"MCCP{self._mccp_version} negotiation enabled.")
				return True
			return False
		return bool(super().on_enable_remote(option))  # pragma: no cover

	def on_disable_remote(self, option: bytes) -> None:  # NOQA: D102
		if option in {MCCP1, MCCP2}:
			logger.debug(
				f"MCCP{self._mccp_version if self._mccp_version is not None else ''} negotiation disabled."
			)
			self.disable_mccp()
			return
		super().on_disable_remote(option)  # type: ignore[safe-super]  # pragma: no cover

Attribute is_client: bool inherited property readonly

True if acting as a client, False otherwise.

Attribute is_server: bool inherited property readonly

True if acting as a server, False otherwise.

Method __init__(self, *args, **kwargs) special

Defines the constructor.

Parameters:

Name Type Description Default
*args Any

Positional arguments to be passed to the parent constructor.

()
**kwargs Any

Key-word only arguments to be passed to the parent constructor.

{}
Source code in mudproto/mccp.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
	"""
	Defines the constructor.

	Args:
		*args: Positional arguments to be passed to the parent constructor.
		**kwargs: Key-word only arguments to be passed to the parent constructor.
	"""
	super().__init__(*args, **kwargs)
	self.subnegotiation_map[MCCP1] = lambda *args: None
	self.subnegotiation_map[MCCP2] = lambda *args: None
	self._compression_enabled: bool = False
	self._mccp_version: int | None = None
	self._compressed_input_buffer: bytearray = bytearray()
	self._decompressor: Any = None

Method disable_mccp(self)

Disables compression.

Source code in mudproto/mccp.py
def disable_mccp(self) -> None:
	"""Disables compression."""
	self._mccp_version = None
	self._compression_enabled = False
	self._decompressor = None

Method do(self, option) inherited

Requests that the peer enable an option.

Parameters:

Name Type Description Default
option bytes

The option to enable.

required
Source code in mudproto/mccp.py
@abstractmethod
def do(self, option: bytes) -> None:
	"""
	Requests that the peer enable an option.

	Args:
		option: The option to enable.
	"""

Method dont(self, option) inherited

Requests that the peer disable an option.

Parameters:

Name Type Description Default
option bytes

The option to disable.

required
Source code in mudproto/mccp.py
@abstractmethod
def dont(self, option: bytes) -> None:
	"""
	Requests that the peer disable an option.

	Args:
		option: The option to disable.
	"""

Method get_option_state(self, option) inherited

Gets the state of a Telnet option.

Parameters:

Name Type Description Default
option bytes

The option to get state.

required

Returns:

Type Description
_OptionState

An object containing the option state.

Source code in mudproto/mccp.py
@abstractmethod
def get_option_state(self, option: bytes) -> _OptionState:
	"""
	Gets the state of a Telnet option.

	Args:
		option: The option to get state.

	Returns:
		An object containing the option state.
	"""

Method on_command(self, command, option) inherited

Called when a 1 or 2 byte command is received.

Parameters:

Name Type Description Default
command bytes

The first byte in a 1 or 2 byte negotiation sequence.

required
option bytes | None

The second byte in a 2 byte negotiation sequence or None.

required
Source code in mudproto/mccp.py
@abstractmethod
def on_command(self, command: bytes, option: bytes | None) -> None:
	"""
	Called when a 1 or 2 byte command is received.

	Args:
		command: The first byte in a 1 or 2 byte negotiation sequence.
		option: The second byte in a 2 byte negotiation sequence or None.
	"""

Method on_connection_lost(self) inherited

Called by disconnect when a connection to peer has been lost.

Source code in mudproto/mccp.py
@abstractmethod
def on_connection_lost(self) -> None:
	"""Called by `disconnect` when a connection to peer has been lost."""

Method on_connection_made(self) inherited

Called by connect when a connection to peer has been established.

Source code in mudproto/mccp.py
@abstractmethod
def on_connection_made(self) -> None:
	"""Called by `connect` when a connection to peer has been established."""

Method on_data_received(self, data)

Called by parse when data is received.

Parameters:

Name Type Description Default
data bytes

The received data.

required
Source code in mudproto/mccp.py
def on_data_received(self, data: bytes) -> None:  # NOQA: D102
	input_buffer: bytearray = self._compressed_input_buffer
	input_buffer.extend(data)
	while input_buffer:
		if self._compression_enabled:
			# Data is compressed.
			super().on_data_received(self._decompressor.decompress(input_buffer))
			input_buffer.clear()
			if self._decompressor.unused_data:
				# Uncompressed data following the compressed data.
				# Likely due to the server terminating compression.
				logger.debug(
					"received uncompressed data while compression enabled. Disabling compression."
				)
				input_buffer.extend(self._decompressor.unused_data)
				state = self.get_option_state(MCCP1 if self._mccp_version == 1 else MCCP2)
				state.him.enabled = False
				state.him.negotiating = False
				self.disable_mccp()
				continue  # Process the remaining uncompressed data.
			return  # input_buffer is empty, no need to loop again.
		# Data is uncompressed.
		iac_index: int = input_buffer.find(IAC)
		if self._mccp_version is not None and iac_index != -1:
			# MCCP was negotiated on, and an IAC byte was found.
			if iac_index > 0:
				super().on_data_received(bytes(input_buffer[:iac_index]))
				del input_buffer[:iac_index]
			if input_buffer == IAC:
				# Partial IAC sequence.
				return
			if input_buffer.startswith(IAC_SB):
				se_index: int = input_buffer.find(SE)
				if se_index == -1:
					# Partial subnegotiation sequence.
					return
				if input_buffer.startswith(MCCP_ENABLED_RESPONSES):
					# The server enabled compression. Subsequent data will be compressed.
					self._compression_enabled = True
					self._decompressor = zlib.decompressobj(zlib.MAX_WBITS)
					logger.debug("Peer notifies us that subsequent data will be compressed.")
				else:
					# We don't care about other subnegotiations, pass it on.
					super().on_data_received(bytes(input_buffer[: se_index + 1]))
				del input_buffer[: se_index + 1]
			else:
				# We don't care about other IAC sequences, pass it on.
				super().on_data_received(bytes(input_buffer[:2]))
				del input_buffer[:2]
		else:
			# MCCP was not negotiated on, or no IAC was found.
			super().on_data_received(bytes(input_buffer))
			input_buffer.clear()

Method on_disable_local(self, option) inherited

Disables a locally managed option.

This method is called before we disable a locally enabled option, in order to perform any necessary cleanup.

Note

If on_enable_local is overridden, this method must be overridden as well.

Parameters:

Name Type Description Default
option bytes

The option being disabled.

required
Source code in mudproto/mccp.py
@abstractmethod
def on_disable_local(self, option: bytes) -> None:
	"""
	Disables a locally managed option.

	This method is called before we disable a
	locally enabled option, in order to perform any necessary cleanup.

	Note:
		If on_enable_local is overridden, this method must be overridden as well.

	Args:
		option: The option being disabled.
	"""
	raise NotImplementedError(f"Don't know how to disable local Telnet option {option!r}")

Method on_disable_remote(self, option)

Disables a remotely managed option.

This method is called when peer disables a remotely enabled option, in order to perform any necessary cleanup on our end.

Note

If on_enable_remote is overridden, this method must be overridden as well.

Parameters:

Name Type Description Default
option bytes

The option being disabled.

required
Source code in mudproto/mccp.py
def on_disable_remote(self, option: bytes) -> None:  # NOQA: D102
	if option in {MCCP1, MCCP2}:
		logger.debug(
			f"MCCP{self._mccp_version if self._mccp_version is not None else ''} negotiation disabled."
		)
		self.disable_mccp()
		return
	super().on_disable_remote(option)  # type: ignore[safe-super]  # pragma: no cover

Method on_enable_local(self, option) inherited

Called to accept or reject the request for us to manage the option.

Parameters:

Name Type Description Default
option bytes

The option that peer requests us to handle.

required

Returns:

Type Description
bool

True if we will handle the option, False otherwise.

Source code in mudproto/mccp.py
@abstractmethod
def on_enable_local(self, option: bytes) -> bool:
	"""
	Called to accept or reject the request for us to manage the option.

	Args:
		option: The option that peer requests us to handle.

	Returns:
		True if we will handle the option, False otherwise.
	"""
	return False  # Reject all options by default.

Method on_enable_remote(self, option)

Called to accept or reject the request for peer to manage the option.

Parameters:

Name Type Description Default
option bytes

The option that peer wants to handle.

required

Returns:

Type Description
bool

True if we will allow peer to handle the option, False otherwise.

Source code in mudproto/mccp.py
def on_enable_remote(self, option: bytes) -> bool:  # NOQA: D102
	if option in {MCCP1, MCCP2}:
		if self._mccp_version is None:
			self._mccp_version = 1 if option == MCCP1 else 2
			logger.debug(f"MCCP{self._mccp_version} negotiation enabled.")
			return True
		return False
	return bool(super().on_enable_remote(option))  # pragma: no cover

Method on_option_enabled(self, option) inherited

Called after an option has been fully enabled.

Parameters:

Name Type Description Default
option bytes

The option that has been enabled.

required
Source code in mudproto/mccp.py
@abstractmethod
def on_option_enabled(self, option: bytes) -> None:
	"""
	Called after an option has been fully enabled.

	Args:
		option: The option that has been enabled.
	"""

Method on_subnegotiation(self, option, data) inherited

Called when a subnegotiation is received.

Parameters:

Name Type Description Default
option bytes

The subnegotiation option.

required
data bytes

The payload.

required
Source code in mudproto/mccp.py
@abstractmethod
def on_subnegotiation(self, option: bytes, data: bytes) -> None:
	"""
	Called when a subnegotiation is received.

	Args:
		option: The subnegotiation option.
		data: The payload.
	"""

Method on_unhandled_command(self, command, option) inherited

Called for commands for which no handler is installed.

Parameters:

Name Type Description Default
command bytes

The first byte in a 1 or 2 byte negotiation sequence.

required
option bytes | None

The second byte in a 2 byte negotiation sequence or None.

required
Source code in mudproto/mccp.py
@abstractmethod
def on_unhandled_command(self, command: bytes, option: bytes | None) -> None:
	"""
	Called for commands for which no handler is installed.

	Args:
		command: The first byte in a 1 or 2 byte negotiation sequence.
		option: The second byte in a 2 byte negotiation sequence or None.
	"""

Method on_unhandled_subnegotiation(self, option, data) inherited

Called for subnegotiations for which no handler is installed.

Parameters:

Name Type Description Default
option bytes

The subnegotiation option.

required
data bytes

The payload.

required
Source code in mudproto/mccp.py
@abstractmethod
def on_unhandled_subnegotiation(self, option: bytes, data: bytes) -> None:
	"""
	Called for subnegotiations for which no handler is installed.

	Args:
		option: The subnegotiation option.
		data: The payload.
	"""

Method request_negotiation(self, option, data) inherited

Sends a subnegotiation message to the peer.

Parameters:

Name Type Description Default
option bytes

The subnegotiation option.

required
data bytes

The payload.

required
Source code in mudproto/mccp.py
@abstractmethod
def request_negotiation(self, option: bytes, data: bytes) -> None:
	"""
	Sends a subnegotiation message to the peer.

	Args:
		option: The subnegotiation option.
		data: The payload.
	"""

Method will(self, option) inherited

Indicates our willingness to enable an option.

Parameters:

Name Type Description Default
option bytes

The option to accept.

required
Source code in mudproto/mccp.py
@abstractmethod
def will(self, option: bytes) -> None:
	"""
	Indicates our willingness to enable an option.

	Args:
		option: The option to accept.
	"""

Method wont(self, option) inherited

Indicates we are not willing to enable an option.

Parameters:

Name Type Description Default
option bytes

The option to reject.

required
Source code in mudproto/mccp.py
@abstractmethod
def wont(self, option: bytes) -> None:
	"""
	Indicates we are not willing to enable an option.

	Args:
		option: The option to reject.
	"""

Method write(self, data) inherited

Writes data to peer.

Parameters:

Name Type Description Default
data bytes

The bytes to be written.

required
Source code in mudproto/mccp.py
def write(self, data: bytes) -> None:
	"""
	Writes data to peer.

	Args:
		data: The bytes to be written.
	"""
	self._writer(data)