Skip to content

Module mudproto.telnet

Telnet protocol.

Class TelnetError(Exception)

Implements the base class for Telnet exceptions.

Source code in mudproto/telnet.py
class TelnetError(Exception):
	"""Implements the base class for Telnet exceptions."""

Class TelnetInterface(ConnectionInterface)

Defines the interface for the Telnet protocol.

Source code in mudproto/telnet.py
class TelnetInterface(ConnectionInterface):
	"""Defines the interface for the Telnet protocol."""

	command_map: TelnetCommandMapType
	"""A mapping of bytes to callables."""
	subnegotiation_map: TelnetSubnegotiationMapType
	"""A mapping of bytes to callables."""

	@abstractmethod
	def will(self, option: bytes) -> None:
		"""
		Indicates our willingness to enable an option.

		Args:
			option: The option to accept.
		"""

	@abstractmethod
	def wont(self, option: bytes) -> None:
		"""
		Indicates we are not willing to enable an option.

		Args:
			option: The option to reject.
		"""

	@abstractmethod
	def do(self, option: bytes) -> None:
		"""
		Requests that the peer enable an option.

		Args:
			option: The option to enable.
		"""

	@abstractmethod
	def dont(self, option: bytes) -> None:
		"""
		Requests that the peer disable an option.

		Args:
			option: The option to disable.
		"""

	@abstractmethod
	def get_option_state(self, option: bytes) -> _OptionState:
		"""
		Gets the state of a Telnet option.

		Args:
			option: The option to get state.

		Returns:
			An object containing the option state.
		"""

	@abstractmethod
	def request_negotiation(self, option: bytes, data: bytes) -> None:
		"""
		Sends a subnegotiation message to the peer.

		Args:
			option: The subnegotiation option.
			data: The payload.
		"""

	@abstractmethod
	def on_command(self, command: bytes, option: bytes | None) -> None:
		"""
		Called when a 1 or 2 byte command is received.

		Args:
			command: The first byte in a 1 or 2 byte negotiation sequence.
			option: The second byte in a 2 byte negotiation sequence or None.
		"""

	@abstractmethod
	def on_subnegotiation(self, option: bytes, data: bytes) -> None:
		"""
		Called when a subnegotiation is received.

		Args:
			option: The subnegotiation option.
			data: The payload.
		"""

	@abstractmethod
	def on_unhandled_command(self, command: bytes, option: bytes | None) -> None:
		"""
		Called for commands for which no handler is installed.

		Args:
			command: The first byte in a 1 or 2 byte negotiation sequence.
			option: The second byte in a 2 byte negotiation sequence or None.
		"""

	@abstractmethod
	def on_unhandled_subnegotiation(self, option: bytes, data: bytes) -> None:
		"""
		Called for subnegotiations for which no handler is installed.

		Args:
			option: The subnegotiation option.
			data: The payload.
		"""

	@abstractmethod
	def on_enable_local(self, option: bytes) -> bool:
		"""
		Called to accept or reject the request for us to manage the option.

		Args:
			option: The option that peer requests us to handle.

		Returns:
			True if we will handle the option, False otherwise.
		"""
		return False  # Reject all options by default.

	@abstractmethod
	def on_disable_local(self, option: bytes) -> None:
		"""
		Disables a locally managed option.

		This method is called before we disable a
		locally enabled option, in order to perform any necessary cleanup.

		Note:
			If on_enable_local is overridden, this method must be overridden as well.

		Args:
			option: The option being disabled.
		"""
		raise NotImplementedError(f"Don't know how to disable local Telnet option {option!r}")

	@abstractmethod
	def on_enable_remote(self, option: bytes) -> bool:
		"""
		Called to accept or reject the request for peer to manage the option.

		Args:
			option: The option that peer wants to handle.

		Returns:
			True if we will allow peer to handle the option, False otherwise.
		"""
		return False  # Reject all options by default.

	@abstractmethod
	def on_disable_remote(self, option: bytes) -> None:
		"""
		Disables a remotely managed option.

		This method is called when peer disables a remotely enabled option,
		in order to perform any necessary cleanup on our end.

		Note:
			If on_enable_remote is overridden, this method must be overridden as well.

		Args:
			option: The option being disabled.
		"""
		raise NotImplementedError(f"Don't know how to disable remote Telnet option {option!r}")

	@abstractmethod
	def on_option_enabled(self, option: bytes) -> None:
		"""
		Called after an option has been fully enabled.

		Args:
			option: The option that has been enabled.
		"""

Attribute is_client: bool inherited property readonly

True if acting as a client, False otherwise.

Attribute is_server: bool inherited property readonly

True if acting as a server, False otherwise.

Method do(self, option)

Requests that the peer enable an option.

Parameters:

Name Type Description Default
option bytes

The option to enable.

required
Source code in mudproto/telnet.py
@abstractmethod
def do(self, option: bytes) -> None:
	"""
	Requests that the peer enable an option.

	Args:
		option: The option to enable.
	"""

Method dont(self, option)

Requests that the peer disable an option.

Parameters:

Name Type Description Default
option bytes

The option to disable.

required
Source code in mudproto/telnet.py
@abstractmethod
def dont(self, option: bytes) -> None:
	"""
	Requests that the peer disable an option.

	Args:
		option: The option to disable.
	"""

Method get_option_state(self, option)

Gets the state of a Telnet option.

Parameters:

Name Type Description Default
option bytes

The option to get state.

required

Returns:

Type Description
_OptionState

An object containing the option state.

Source code in mudproto/telnet.py
@abstractmethod
def get_option_state(self, option: bytes) -> _OptionState:
	"""
	Gets the state of a Telnet option.

	Args:
		option: The option to get state.

	Returns:
		An object containing the option state.
	"""

Method on_command(self, command, option)

Called when a 1 or 2 byte command is received.

Parameters:

Name Type Description Default
command bytes

The first byte in a 1 or 2 byte negotiation sequence.

required
option bytes | None

The second byte in a 2 byte negotiation sequence or None.

required
Source code in mudproto/telnet.py
@abstractmethod
def on_command(self, command: bytes, option: bytes | None) -> None:
	"""
	Called when a 1 or 2 byte command is received.

	Args:
		command: The first byte in a 1 or 2 byte negotiation sequence.
		option: The second byte in a 2 byte negotiation sequence or None.
	"""

Method on_connection_lost(self) inherited

Called by disconnect when a connection to peer has been lost.

Source code in mudproto/telnet.py
@abstractmethod
def on_connection_lost(self) -> None:
	"""Called by `disconnect` when a connection to peer has been lost."""

Method on_connection_made(self) inherited

Called by connect when a connection to peer has been established.

Source code in mudproto/telnet.py
@abstractmethod
def on_connection_made(self) -> None:
	"""Called by `connect` when a connection to peer has been established."""

Method on_data_received(self, data) inherited

Called by parse when data is received.

Parameters:

Name Type Description Default
data bytes

The received data.

required
Source code in mudproto/telnet.py
@abstractmethod
def on_data_received(self, data: bytes) -> None:
	"""
	Called by `parse` when data is received.

	Args:
		data: The received data.
	"""
	self._receiver(data)

Method on_disable_local(self, option)

Disables a locally managed option.

This method is called before we disable a locally enabled option, in order to perform any necessary cleanup.

Note

If on_enable_local is overridden, this method must be overridden as well.

Parameters:

Name Type Description Default
option bytes

The option being disabled.

required
Source code in mudproto/telnet.py
@abstractmethod
def on_disable_local(self, option: bytes) -> None:
	"""
	Disables a locally managed option.

	This method is called before we disable a
	locally enabled option, in order to perform any necessary cleanup.

	Note:
		If on_enable_local is overridden, this method must be overridden as well.

	Args:
		option: The option being disabled.
	"""
	raise NotImplementedError(f"Don't know how to disable local Telnet option {option!r}")

Method on_disable_remote(self, option)

Disables a remotely managed option.

This method is called when peer disables a remotely enabled option, in order to perform any necessary cleanup on our end.

Note

If on_enable_remote is overridden, this method must be overridden as well.

Parameters:

Name Type Description Default
option bytes

The option being disabled.

required
Source code in mudproto/telnet.py
@abstractmethod
def on_disable_remote(self, option: bytes) -> None:
	"""
	Disables a remotely managed option.

	This method is called when peer disables a remotely enabled option,
	in order to perform any necessary cleanup on our end.

	Note:
		If on_enable_remote is overridden, this method must be overridden as well.

	Args:
		option: The option being disabled.
	"""
	raise NotImplementedError(f"Don't know how to disable remote Telnet option {option!r}")

Method on_enable_local(self, option)

Called to accept or reject the request for us to manage the option.

Parameters:

Name Type Description Default
option bytes

The option that peer requests us to handle.

required

Returns:

Type Description
bool

True if we will handle the option, False otherwise.

Source code in mudproto/telnet.py
@abstractmethod
def on_enable_local(self, option: bytes) -> bool:
	"""
	Called to accept or reject the request for us to manage the option.

	Args:
		option: The option that peer requests us to handle.

	Returns:
		True if we will handle the option, False otherwise.
	"""
	return False  # Reject all options by default.

Method on_enable_remote(self, option)

Called to accept or reject the request for peer to manage the option.

Parameters:

Name Type Description Default
option bytes

The option that peer wants to handle.

required

Returns:

Type Description
bool

True if we will allow peer to handle the option, False otherwise.

Source code in mudproto/telnet.py
@abstractmethod
def on_enable_remote(self, option: bytes) -> bool:
	"""
	Called to accept or reject the request for peer to manage the option.

	Args:
		option: The option that peer wants to handle.

	Returns:
		True if we will allow peer to handle the option, False otherwise.
	"""
	return False  # Reject all options by default.

Method on_option_enabled(self, option)

Called after an option has been fully enabled.

Parameters:

Name Type Description Default
option bytes

The option that has been enabled.

required
Source code in mudproto/telnet.py
@abstractmethod
def on_option_enabled(self, option: bytes) -> None:
	"""
	Called after an option has been fully enabled.

	Args:
		option: The option that has been enabled.
	"""

Method on_subnegotiation(self, option, data)

Called when a subnegotiation is received.

Parameters:

Name Type Description Default
option bytes

The subnegotiation option.

required
data bytes

The payload.

required
Source code in mudproto/telnet.py
@abstractmethod
def on_subnegotiation(self, option: bytes, data: bytes) -> None:
	"""
	Called when a subnegotiation is received.

	Args:
		option: The subnegotiation option.
		data: The payload.
	"""

Method on_unhandled_command(self, command, option)

Called for commands for which no handler is installed.

Parameters:

Name Type Description Default
command bytes

The first byte in a 1 or 2 byte negotiation sequence.

required
option bytes | None

The second byte in a 2 byte negotiation sequence or None.

required
Source code in mudproto/telnet.py
@abstractmethod
def on_unhandled_command(self, command: bytes, option: bytes | None) -> None:
	"""
	Called for commands for which no handler is installed.

	Args:
		command: The first byte in a 1 or 2 byte negotiation sequence.
		option: The second byte in a 2 byte negotiation sequence or None.
	"""

Method on_unhandled_subnegotiation(self, option, data)

Called for subnegotiations for which no handler is installed.

Parameters:

Name Type Description Default
option bytes

The subnegotiation option.

required
data bytes

The payload.

required
Source code in mudproto/telnet.py
@abstractmethod
def on_unhandled_subnegotiation(self, option: bytes, data: bytes) -> None:
	"""
	Called for subnegotiations for which no handler is installed.

	Args:
		option: The subnegotiation option.
		data: The payload.
	"""

Method request_negotiation(self, option, data)

Sends a subnegotiation message to the peer.

Parameters:

Name Type Description Default
option bytes

The subnegotiation option.

required
data bytes

The payload.

required
Source code in mudproto/telnet.py
@abstractmethod
def request_negotiation(self, option: bytes, data: bytes) -> None:
	"""
	Sends a subnegotiation message to the peer.

	Args:
		option: The subnegotiation option.
		data: The payload.
	"""

Method will(self, option)

Indicates our willingness to enable an option.

Parameters:

Name Type Description Default
option bytes

The option to accept.

required
Source code in mudproto/telnet.py
@abstractmethod
def will(self, option: bytes) -> None:
	"""
	Indicates our willingness to enable an option.

	Args:
		option: The option to accept.
	"""

Method wont(self, option)

Indicates we are not willing to enable an option.

Parameters:

Name Type Description Default
option bytes

The option to reject.

required
Source code in mudproto/telnet.py
@abstractmethod
def wont(self, option: bytes) -> None:
	"""
	Indicates we are not willing to enable an option.

	Args:
		option: The option to reject.
	"""

Method write(self, data) inherited

Writes data to peer.

Parameters:

Name Type Description Default
data bytes

The bytes to be written.

required
Source code in mudproto/telnet.py
def write(self, data: bytes) -> None:
	"""
	Writes data to peer.

	Args:
		data: The bytes to be written.
	"""
	self._writer(data)

Class TelnetProtocol(TelnetInterface)

Implements the Telnet protocol.

Source code in mudproto/telnet.py
class TelnetProtocol(TelnetInterface):  # NOQA: PLR0904
	"""Implements the Telnet protocol."""

	def __init__(self, *args: Any, **kwargs: Any) -> None:
		"""
		Defines the constructor.

		Args:
			*args: Positional arguments to be passed to the parent constructor.
			**kwargs: Key-word only arguments to be passed to the parent constructor.
		"""
		super().__init__(*args, **kwargs)
		self.state: TelnetState = TelnetState.DATA
		"""The state of the state machine."""
		self._options: dict[bytes, _OptionState] = {}
		"""A mapping of option bytes to their current state."""
		# When a Telnet command is received, the command byte,
		# the first byte after IAC, is looked up in the commandMap dictionary.
		# If a callable is found, it is invoked with the argument of the command,
		# or None if the command takes no argument.  Values should be added to
		# this dictionary if commands wish to be handled.  By default,
		# only WILL, WONT, DO, and DONT are handled.  These should not
		# be overridden, as this class handles them correctly and
		# provides an API for interacting with them.
		self.command_map: TelnetCommandMapType = {
			WILL: self.on_will,
			WONT: self.on_wont,
			DO: self.on_do,
			DONT: self.on_dont,
		}
		# When a subnegotiation command is received, the option byte, the
		# first byte after SB, is looked up in the subnegotiationMap dictionary.  If
		# a callable is found, it is invoked with the argument of the
		# subnegotiation.  Values should be added to this dictionary if
		# subnegotiations are to be handled.  By default, no values are
		# handled.
		self.subnegotiation_map: TelnetSubnegotiationMapType = {}

	def _do(self, option: bytes) -> None:
		"""
		Sends IAC DO option to the peer.

		Args:
			option: The option to send.
		"""
		logger.debug(f"Send to peer: IAC DO {DESCRIPTIONS.get(option, repr(option))}")
		self.write(IAC + DO + option)

	def _dont(self, option: bytes) -> None:
		"""
		Sends IAC DONT option to the peer.

		Args:
			option: The option to send.
		"""
		logger.debug(f"Send to peer: IAC DONT {DESCRIPTIONS.get(option, repr(option))}")
		self.write(IAC + DONT + option)

	def _will(self, option: bytes) -> None:
		"""
		Sends IAC WILL option to the peer.

		Args:
			option: The option to send.
		"""
		logger.debug(f"Send to peer: IAC WILL {DESCRIPTIONS.get(option, repr(option))}")
		self.write(IAC + WILL + option)

	def _wont(self, option: bytes) -> None:
		"""
		Sends IAC WONT option to the peer.

		Args:
			option: The option to send.
		"""
		logger.debug(f"Send to peer: IAC WONT {DESCRIPTIONS.get(option, repr(option))}")
		self.write(IAC + WONT + option)

	def will(self, option: bytes) -> None:
		"""
		Tells peer we would like to enable a Telnet option.

		Args:
			option: The option we wish to enable.
		"""
		state = self.get_option_state(option)
		if state.us.negotiating or state.him.negotiating:
			logger.warning(
				f"We are offering to enable option {option!r}, but the option is "
				+ f"already being negotiated by {'us' if state.us.negotiating else 'peer'}."
			)
		elif state.us.enabled:
			logger.warning(f"Attempting to enable an already enabled option {option!r}.")
		else:
			state.us.negotiating = True
			self._will(option)

	def wont(self, option: bytes) -> None:
		"""
		Tells peer we no longer wish to enable a Telnet option.

		Args:
			option: The option we wish to disable.
		"""
		state = self.get_option_state(option)
		if state.us.negotiating or state.him.negotiating:
			logger.warning(
				f"We are refusing to enable option {option!r}, but the option is "
				+ f"already being negotiated by {'us' if state.us.negotiating else 'peer'}."
			)
		elif not state.us.enabled:
			logger.warning(f"Attempting to disable an already disabled option {option!r}.")
		else:
			state.us.negotiating = True
			self._wont(option)

	def do(self, option: bytes) -> None:
		"""
		Tells peer we would like him to enable a Telnet option.

		Args:
			option: The option we wish peer to enable.
		"""
		state = self.get_option_state(option)
		if state.us.negotiating or state.him.negotiating:
			logger.warning(
				f"We are requesting that peer enable option {option!r}, but the option is "
				+ f"already being negotiated by {'us' if state.us.negotiating else 'peer'}."
			)
		elif state.him.enabled:
			logger.warning(f"Requesting that peer enable an already enabled option {option!r}.")
		else:
			state.him.negotiating = True
			self._do(option)

	def dont(self, option: bytes) -> None:
		"""
		Tells peer we would like him to disable a Telnet option.

		Args:
			option: The option we wish to disable.
		"""
		state = self.get_option_state(option)
		if state.us.negotiating or state.him.negotiating:
			logger.warning(
				f"We are requesting that peer disable option {option!r}, but the option is "
				+ f"already being negotiated by {'us' if state.us.negotiating else 'peer'}."
			)
		elif not state.him.enabled:
			logger.warning(f"Requesting that peer disable an already disabled option {option!r}.")
		else:
			state.him.negotiating = True
			self._dont(option)

	def get_option_state(self, option: bytes) -> _OptionState:
		"""
		Retrieves the state of an option.

		Args:
			option: The option we wish to get the state of.

		Returns:
			The option state.
		"""
		if option not in self._options:
			self._options[option] = _OptionState()
		return self._options[option]

	def request_negotiation(self, option: bytes, data: bytes) -> None:
		"""
		Performs a Telnet sub-negotiation for the given option.

		Args:
			option: The option we are negotiating.
			data: The data we are sending in the body of the negotiation.
		"""
		self.write(IAC + SB + option + escape_iac(data) + IAC + SE)

	def on_connection_made(self) -> None:  # NOQA: D102
		return super().on_connection_made()  # type: ignore[safe-super]

	def on_connection_lost(self) -> None:  # NOQA: D102
		return super().on_connection_lost()  # type: ignore[safe-super]

	def on_data_received(self, data: bytes) -> None:  # NOQA: C901, D102, PLR0912, PLR0915
		app_data_buffer: bytearray = bytearray()
		while data:
			if self.state is TelnetState.DATA:
				app_data, separator, data = data.partition(IAC)
				if separator:
					self.state = TelnetState.COMMAND
				elif app_data.endswith(CR):
					self.state = TelnetState.NEWLINE
					app_data = app_data[:-1]
				app_data_buffer.extend(app_data.replace(CR_LF, LF).replace(CR_NULL, CR))
				continue
			byte, data = data[:1], data[1:]
			if self.state is TelnetState.COMMAND:
				if byte == IAC:
					# Escaped IAC.
					app_data_buffer.extend(byte)
					self.state = TelnetState.DATA
				elif byte == SE:
					self.state = TelnetState.DATA
					logger.warning("IAC SE received outside of subnegotiation.")
				elif byte == SB:
					self.state = TelnetState.SUBNEGOTIATION
					self._commands: bytearray = bytearray()
				elif byte in COMMAND_BYTES:
					self.state = TelnetState.DATA
					if app_data_buffer:
						super().on_data_received(bytes(app_data_buffer))
						app_data_buffer.clear()
					logger.debug(f"Received from peer: IAC {DESCRIPTIONS[byte]}")
					self.on_command(byte, None)
				elif byte in NEGOTIATION_BYTES:
					self.state = TelnetState.NEGOTIATION
					self._command = byte
				else:
					self.state = TelnetState.DATA
					logger.warning(f"Unknown Telnet command received {byte!r}.")
			elif self.state is TelnetState.NEGOTIATION:
				self.state = TelnetState.DATA
				command = self._command
				del self._command
				if app_data_buffer:
					super().on_data_received(bytes(app_data_buffer))
					app_data_buffer.clear()
				logger.debug(
					f"Received from peer: IAC {DESCRIPTIONS[command]} {DESCRIPTIONS.get(byte, repr(byte))}"
				)
				self.on_command(command, byte)
			elif self.state is TelnetState.NEWLINE:
				self.state = TelnetState.DATA
				if byte == LF:
					app_data_buffer.extend(byte)
				elif byte == NULL:
					app_data_buffer.extend(CR)
				elif byte == IAC:
					# IAC isn't really allowed after CR, according to the
					# RFC, but handling it this way is less surprising than
					# delivering the IAC to the app as application data.
					# The purpose of the restriction is to allow terminals
					# to unambiguously interpret the behavior of the CR
					# after reading only one more byte.  CR + LF is supposed
					# to mean one thing, cursor to next line, first column,
					# CR + NUL another, cursor to first column.  Absent the
					# NUL, it still makes sense to interpret this as CR and
					# then apply all the usual interpretation to the IAC.
					app_data_buffer.extend(CR)
					self.state = TelnetState.COMMAND
				else:
					app_data_buffer.extend(CR + byte)
			elif self.state is TelnetState.SUBNEGOTIATION:
				if byte == IAC:
					self.state = TelnetState.SUBNEGOTIATION_ESCAPED
				else:
					self._commands.extend(byte)
			elif self.state is TelnetState.SUBNEGOTIATION_ESCAPED:
				if byte == SE:
					self.state = TelnetState.DATA
					commands = bytes(self._commands)
					del self._commands
					if app_data_buffer:
						super().on_data_received(bytes(app_data_buffer))
						app_data_buffer.clear()
					option, commands = commands[:1], commands[1:]
					logger.debug(
						f"Received from peer: IAC SB {DESCRIPTIONS.get(option, repr(option))} "
						+ f"{commands!r} IAC SE"
					)
					self.on_subnegotiation(option, commands)
				else:
					self.state = TelnetState.SUBNEGOTIATION
					self._commands.extend(byte)
		if app_data_buffer:
			super().on_data_received(bytes(app_data_buffer))

	def on_command(self, command: bytes, option: bytes | None) -> None:  # NOQA: D102
		if command in self.command_map:
			self.command_map[command](option)
		else:
			self.on_unhandled_command(command, option)

	def on_subnegotiation(self, option: bytes, data: bytes) -> None:  # NOQA: D102
		if option in self.subnegotiation_map:
			self.subnegotiation_map[option](data)
		else:
			self.on_unhandled_subnegotiation(option, data)

	def on_will(self, option: bytes | None) -> None:
		"""
		Called when an IAC + WILL + option is received.

		Args:
			option: The received option.
		"""  # NOQA: DOC501
		if option is None:
			raise AssertionError("Option must not be None in this context.")
		state = self.get_option_state(option)
		if not state.him.enabled and not state.him.negotiating:
			# Peer is unilaterally offering to enable an option.
			if self.on_enable_remote(option):
				state.him.enabled = True
				self._do(option)
				self.on_option_enabled(option)
			else:
				self._dont(option)
		elif not state.him.enabled and state.him.negotiating:
			# Peer agreed to enable an option in response to our request.
			state.him.enabled = True
			state.him.negotiating = False
			if not self.on_enable_remote(option):
				raise AssertionError(f"enableRemote must return True in this context (for option {option!r})")
			self.on_option_enabled(option)
		elif state.him.enabled and not state.him.negotiating:
			# Peer is unilaterally offering to enable an already-enabled option.
			# Ignore this.
			pass
		elif state.him.enabled and state.him.negotiating:
			# This is a bogus state.  It is here for completeness.  It will
			# never be entered.
			raise AssertionError(
				"him.enabled and him.negotiating cannot be True at the same time. "
				+ f"state: {state!r}, option: {option!r}"
			)

	def on_wont(self, option: bytes | None) -> None:
		"""
		Called when an IAC + WONT + option is received.

		Args:
			option: The received option.
		"""  # NOQA: DOC501
		if option is None:
			raise AssertionError("Option must not be None in this context.")
		state = self.get_option_state(option)
		if not state.him.enabled and not state.him.negotiating:
			# Peer is unilaterally demanding that an already-disabled option be/remain disabled.
			# Ignore this, although we could record it and refuse subsequent enable attempts
			# from our side, peer could refuse them again, so we won't.
			pass
		elif not state.him.enabled and state.him.negotiating:
			# Peer refused to enable an option in response to our request.
			state.him.negotiating = False
			logger.debug(
				f"Peer refuses to enable option {DESCRIPTIONS.get(option, repr(option))} "
				+ "in response to our request."
			)
		elif state.him.enabled and not state.him.negotiating:
			# Peer is unilaterally demanding that an option be disabled.
			state.him.enabled = False
			self.on_disable_remote(option)
			self._dont(option)
		elif state.him.enabled and state.him.negotiating:
			# Peer agreed to disable an option at our request.
			state.him.enabled = False
			state.him.negotiating = False
			self.on_disable_remote(option)

	def on_do(self, option: bytes | None) -> None:
		"""
		Called when an IAC + DO + option is received.

		Args:
			option: The received option.
		"""  # NOQA: DOC501
		if option is None:
			raise AssertionError("Option must not be None in this context.")
		state = self.get_option_state(option)
		if not state.us.enabled and not state.us.negotiating:
			# Peer is unilaterally requesting that we enable an option.
			if self.on_enable_local(option):
				state.us.enabled = True
				self._will(option)
				self.on_option_enabled(option)
			else:
				self._wont(option)
		elif not state.us.enabled and state.us.negotiating:
			# Peer agreed to allow us to enable an option at our request.
			state.us.enabled = True
			state.us.negotiating = False
			self.on_enable_local(option)
			self.on_option_enabled(option)
		elif state.us.enabled and not state.us.negotiating:
			# Peer is unilaterally requesting us to enable an already-enabled option.
			# Ignore this.
			pass
		elif state.us.enabled and state.us.negotiating:
			# This is a bogus state.  It is here for completeness.  It will never be
			# entered.
			raise AssertionError(
				"us.enabled and us.negotiating cannot be True at the same time. "
				+ f"state: {state!r}, option: {option!r}"
			)

	def on_dont(self, option: bytes | None) -> None:
		"""
		Called when an IAC + DONT + option is received.

		Args:
			option: The received option.
		"""  # NOQA: DOC501
		if option is None:
			raise AssertionError("Option must not be None in this context.")
		state = self.get_option_state(option)
		if not state.us.enabled and not state.us.negotiating:
			# Peer is unilaterally demanding us to disable an already-disabled option.
			# Ignore this.
			pass
		elif not state.us.enabled and state.us.negotiating:
			# Offered option was refused.
			state.us.negotiating = False
			logger.debug(f"Peer rejects our offer to enable option {DESCRIPTIONS.get(option, repr(option))}.")
		elif state.us.enabled and not state.us.negotiating:
			# Peer is unilaterally demanding we disable an option.
			state.us.enabled = False
			self.on_disable_local(option)
			self._wont(option)
		elif state.us.enabled and state.us.negotiating:
			# Peer acknowledged our notice that we will disable an option.
			state.us.enabled = False
			state.us.negotiating = False
			self.on_disable_local(option)

	def on_unhandled_command(self, command: bytes, option: bytes | None) -> None:  # NOQA: D102
		return super().on_unhandled_command(command, option)  # type: ignore[safe-super]

	def on_unhandled_subnegotiation(self, option: bytes, data: bytes) -> None:  # NOQA: D102
		return super().on_unhandled_subnegotiation(option, data)  # type: ignore[safe-super]

	def on_enable_local(self, option: bytes) -> bool:  # NOQA: D102
		return super().on_enable_local(option)

	def on_disable_local(self, option: bytes) -> None:  # NOQA: D102
		return super().on_disable_local(option)  # type: ignore[safe-super]

	def on_enable_remote(self, option: bytes) -> bool:  # NOQA: D102
		return super().on_enable_remote(option)

	def on_disable_remote(self, option: bytes) -> None:  # NOQA: D102
		return super().on_disable_remote(option)  # type: ignore[safe-super]

	def on_option_enabled(self, option: bytes) -> None:  # NOQA: D102
		return super().on_option_enabled(option)  # type: ignore[safe-super]

Attribute is_client: bool inherited property readonly

True if acting as a client, False otherwise.

Attribute is_server: bool inherited property readonly

True if acting as a server, False otherwise.

Method __init__(self, *args, **kwargs) special

Defines the constructor.

Parameters:

Name Type Description Default
*args Any

Positional arguments to be passed to the parent constructor.

()
**kwargs Any

Key-word only arguments to be passed to the parent constructor.

{}
Source code in mudproto/telnet.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
	"""
	Defines the constructor.

	Args:
		*args: Positional arguments to be passed to the parent constructor.
		**kwargs: Key-word only arguments to be passed to the parent constructor.
	"""
	super().__init__(*args, **kwargs)
	self.state: TelnetState = TelnetState.DATA
	"""The state of the state machine."""
	self._options: dict[bytes, _OptionState] = {}
	"""A mapping of option bytes to their current state."""
	# When a Telnet command is received, the command byte,
	# the first byte after IAC, is looked up in the commandMap dictionary.
	# If a callable is found, it is invoked with the argument of the command,
	# or None if the command takes no argument.  Values should be added to
	# this dictionary if commands wish to be handled.  By default,
	# only WILL, WONT, DO, and DONT are handled.  These should not
	# be overridden, as this class handles them correctly and
	# provides an API for interacting with them.
	self.command_map: TelnetCommandMapType = {
		WILL: self.on_will,
		WONT: self.on_wont,
		DO: self.on_do,
		DONT: self.on_dont,
	}
	# When a subnegotiation command is received, the option byte, the
	# first byte after SB, is looked up in the subnegotiationMap dictionary.  If
	# a callable is found, it is invoked with the argument of the
	# subnegotiation.  Values should be added to this dictionary if
	# subnegotiations are to be handled.  By default, no values are
	# handled.
	self.subnegotiation_map: TelnetSubnegotiationMapType = {}

Method do(self, option)

Tells peer we would like him to enable a Telnet option.

Parameters:

Name Type Description Default
option bytes

The option we wish peer to enable.

required
Source code in mudproto/telnet.py
def do(self, option: bytes) -> None:
	"""
	Tells peer we would like him to enable a Telnet option.

	Args:
		option: The option we wish peer to enable.
	"""
	state = self.get_option_state(option)
	if state.us.negotiating or state.him.negotiating:
		logger.warning(
			f"We are requesting that peer enable option {option!r}, but the option is "
			+ f"already being negotiated by {'us' if state.us.negotiating else 'peer'}."
		)
	elif state.him.enabled:
		logger.warning(f"Requesting that peer enable an already enabled option {option!r}.")
	else:
		state.him.negotiating = True
		self._do(option)

Method dont(self, option)

Tells peer we would like him to disable a Telnet option.

Parameters:

Name Type Description Default
option bytes

The option we wish to disable.

required
Source code in mudproto/telnet.py
def dont(self, option: bytes) -> None:
	"""
	Tells peer we would like him to disable a Telnet option.

	Args:
		option: The option we wish to disable.
	"""
	state = self.get_option_state(option)
	if state.us.negotiating or state.him.negotiating:
		logger.warning(
			f"We are requesting that peer disable option {option!r}, but the option is "
			+ f"already being negotiated by {'us' if state.us.negotiating else 'peer'}."
		)
	elif not state.him.enabled:
		logger.warning(f"Requesting that peer disable an already disabled option {option!r}.")
	else:
		state.him.negotiating = True
		self._dont(option)

Method get_option_state(self, option)

Retrieves the state of an option.

Parameters:

Name Type Description Default
option bytes

The option we wish to get the state of.

required

Returns:

Type Description
_OptionState

The option state.

Source code in mudproto/telnet.py
def get_option_state(self, option: bytes) -> _OptionState:
	"""
	Retrieves the state of an option.

	Args:
		option: The option we wish to get the state of.

	Returns:
		The option state.
	"""
	if option not in self._options:
		self._options[option] = _OptionState()
	return self._options[option]

Method on_command(self, command, option)

Called when a 1 or 2 byte command is received.

Parameters:

Name Type Description Default
command bytes

The first byte in a 1 or 2 byte negotiation sequence.

required
option bytes | None

The second byte in a 2 byte negotiation sequence or None.

required
Source code in mudproto/telnet.py
def on_command(self, command: bytes, option: bytes | None) -> None:  # NOQA: D102
	if command in self.command_map:
		self.command_map[command](option)
	else:
		self.on_unhandled_command(command, option)

Method on_connection_lost(self)

Called by disconnect when a connection to peer has been lost.

Source code in mudproto/telnet.py
def on_connection_lost(self) -> None:  # NOQA: D102
	return super().on_connection_lost()  # type: ignore[safe-super]

Method on_connection_made(self)

Called by connect when a connection to peer has been established.

Source code in mudproto/telnet.py
def on_connection_made(self) -> None:  # NOQA: D102
	return super().on_connection_made()  # type: ignore[safe-super]

Method on_data_received(self, data)

Called by parse when data is received.

Parameters:

Name Type Description Default
data bytes

The received data.

required
Source code in mudproto/telnet.py
def on_data_received(self, data: bytes) -> None:  # NOQA: C901, D102, PLR0912, PLR0915
	app_data_buffer: bytearray = bytearray()
	while data:
		if self.state is TelnetState.DATA:
			app_data, separator, data = data.partition(IAC)
			if separator:
				self.state = TelnetState.COMMAND
			elif app_data.endswith(CR):
				self.state = TelnetState.NEWLINE
				app_data = app_data[:-1]
			app_data_buffer.extend(app_data.replace(CR_LF, LF).replace(CR_NULL, CR))
			continue
		byte, data = data[:1], data[1:]
		if self.state is TelnetState.COMMAND:
			if byte == IAC:
				# Escaped IAC.
				app_data_buffer.extend(byte)
				self.state = TelnetState.DATA
			elif byte == SE:
				self.state = TelnetState.DATA
				logger.warning("IAC SE received outside of subnegotiation.")
			elif byte == SB:
				self.state = TelnetState.SUBNEGOTIATION
				self._commands: bytearray = bytearray()
			elif byte in COMMAND_BYTES:
				self.state = TelnetState.DATA
				if app_data_buffer:
					super().on_data_received(bytes(app_data_buffer))
					app_data_buffer.clear()
				logger.debug(f"Received from peer: IAC {DESCRIPTIONS[byte]}")
				self.on_command(byte, None)
			elif byte in NEGOTIATION_BYTES:
				self.state = TelnetState.NEGOTIATION
				self._command = byte
			else:
				self.state = TelnetState.DATA
				logger.warning(f"Unknown Telnet command received {byte!r}.")
		elif self.state is TelnetState.NEGOTIATION:
			self.state = TelnetState.DATA
			command = self._command
			del self._command
			if app_data_buffer:
				super().on_data_received(bytes(app_data_buffer))
				app_data_buffer.clear()
			logger.debug(
				f"Received from peer: IAC {DESCRIPTIONS[command]} {DESCRIPTIONS.get(byte, repr(byte))}"
			)
			self.on_command(command, byte)
		elif self.state is TelnetState.NEWLINE:
			self.state = TelnetState.DATA
			if byte == LF:
				app_data_buffer.extend(byte)
			elif byte == NULL:
				app_data_buffer.extend(CR)
			elif byte == IAC:
				# IAC isn't really allowed after CR, according to the
				# RFC, but handling it this way is less surprising than
				# delivering the IAC to the app as application data.
				# The purpose of the restriction is to allow terminals
				# to unambiguously interpret the behavior of the CR
				# after reading only one more byte.  CR + LF is supposed
				# to mean one thing, cursor to next line, first column,
				# CR + NUL another, cursor to first column.  Absent the
				# NUL, it still makes sense to interpret this as CR and
				# then apply all the usual interpretation to the IAC.
				app_data_buffer.extend(CR)
				self.state = TelnetState.COMMAND
			else:
				app_data_buffer.extend(CR + byte)
		elif self.state is TelnetState.SUBNEGOTIATION:
			if byte == IAC:
				self.state = TelnetState.SUBNEGOTIATION_ESCAPED
			else:
				self._commands.extend(byte)
		elif self.state is TelnetState.SUBNEGOTIATION_ESCAPED:
			if byte == SE:
				self.state = TelnetState.DATA
				commands = bytes(self._commands)
				del self._commands
				if app_data_buffer:
					super().on_data_received(bytes(app_data_buffer))
					app_data_buffer.clear()
				option, commands = commands[:1], commands[1:]
				logger.debug(
					f"Received from peer: IAC SB {DESCRIPTIONS.get(option, repr(option))} "
					+ f"{commands!r} IAC SE"
				)
				self.on_subnegotiation(option, commands)
			else:
				self.state = TelnetState.SUBNEGOTIATION
				self._commands.extend(byte)
	if app_data_buffer:
		super().on_data_received(bytes(app_data_buffer))

Method on_disable_local(self, option)

Disables a locally managed option.

This method is called before we disable a locally enabled option, in order to perform any necessary cleanup.

Note

If on_enable_local is overridden, this method must be overridden as well.

Parameters:

Name Type Description Default
option bytes

The option being disabled.

required
Source code in mudproto/telnet.py
def on_disable_local(self, option: bytes) -> None:  # NOQA: D102
	return super().on_disable_local(option)  # type: ignore[safe-super]

Method on_disable_remote(self, option)

Disables a remotely managed option.

This method is called when peer disables a remotely enabled option, in order to perform any necessary cleanup on our end.

Note

If on_enable_remote is overridden, this method must be overridden as well.

Parameters:

Name Type Description Default
option bytes

The option being disabled.

required
Source code in mudproto/telnet.py
def on_disable_remote(self, option: bytes) -> None:  # NOQA: D102
	return super().on_disable_remote(option)  # type: ignore[safe-super]

Method on_do(self, option)

Called when an IAC + DO + option is received.

Parameters:

Name Type Description Default
option bytes | None

The received option.

required
Source code in mudproto/telnet.py
def on_do(self, option: bytes | None) -> None:
	"""
	Called when an IAC + DO + option is received.

	Args:
		option: The received option.
	"""  # NOQA: DOC501
	if option is None:
		raise AssertionError("Option must not be None in this context.")
	state = self.get_option_state(option)
	if not state.us.enabled and not state.us.negotiating:
		# Peer is unilaterally requesting that we enable an option.
		if self.on_enable_local(option):
			state.us.enabled = True
			self._will(option)
			self.on_option_enabled(option)
		else:
			self._wont(option)
	elif not state.us.enabled and state.us.negotiating:
		# Peer agreed to allow us to enable an option at our request.
		state.us.enabled = True
		state.us.negotiating = False
		self.on_enable_local(option)
		self.on_option_enabled(option)
	elif state.us.enabled and not state.us.negotiating:
		# Peer is unilaterally requesting us to enable an already-enabled option.
		# Ignore this.
		pass
	elif state.us.enabled and state.us.negotiating:
		# This is a bogus state.  It is here for completeness.  It will never be
		# entered.
		raise AssertionError(
			"us.enabled and us.negotiating cannot be True at the same time. "
			+ f"state: {state!r}, option: {option!r}"
		)

Method on_dont(self, option)

Called when an IAC + DONT + option is received.

Parameters:

Name Type Description Default
option bytes | None

The received option.

required
Source code in mudproto/telnet.py
def on_dont(self, option: bytes | None) -> None:
	"""
	Called when an IAC + DONT + option is received.

	Args:
		option: The received option.
	"""  # NOQA: DOC501
	if option is None:
		raise AssertionError("Option must not be None in this context.")
	state = self.get_option_state(option)
	if not state.us.enabled and not state.us.negotiating:
		# Peer is unilaterally demanding us to disable an already-disabled option.
		# Ignore this.
		pass
	elif not state.us.enabled and state.us.negotiating:
		# Offered option was refused.
		state.us.negotiating = False
		logger.debug(f"Peer rejects our offer to enable option {DESCRIPTIONS.get(option, repr(option))}.")
	elif state.us.enabled and not state.us.negotiating:
		# Peer is unilaterally demanding we disable an option.
		state.us.enabled = False
		self.on_disable_local(option)
		self._wont(option)
	elif state.us.enabled and state.us.negotiating:
		# Peer acknowledged our notice that we will disable an option.
		state.us.enabled = False
		state.us.negotiating = False
		self.on_disable_local(option)

Method on_enable_local(self, option)

Called to accept or reject the request for us to manage the option.

Parameters:

Name Type Description Default
option bytes

The option that peer requests us to handle.

required

Returns:

Type Description
bool

True if we will handle the option, False otherwise.

Source code in mudproto/telnet.py
def on_enable_local(self, option: bytes) -> bool:  # NOQA: D102
	return super().on_enable_local(option)

Method on_enable_remote(self, option)

Called to accept or reject the request for peer to manage the option.

Parameters:

Name Type Description Default
option bytes

The option that peer wants to handle.

required

Returns:

Type Description
bool

True if we will allow peer to handle the option, False otherwise.

Source code in mudproto/telnet.py
def on_enable_remote(self, option: bytes) -> bool:  # NOQA: D102
	return super().on_enable_remote(option)

Method on_option_enabled(self, option)

Called after an option has been fully enabled.

Parameters:

Name Type Description Default
option bytes

The option that has been enabled.

required
Source code in mudproto/telnet.py
def on_option_enabled(self, option: bytes) -> None:  # NOQA: D102
	return super().on_option_enabled(option)  # type: ignore[safe-super]

Method on_subnegotiation(self, option, data)

Called when a subnegotiation is received.

Parameters:

Name Type Description Default
option bytes

The subnegotiation option.

required
data bytes

The payload.

required
Source code in mudproto/telnet.py
def on_subnegotiation(self, option: bytes, data: bytes) -> None:  # NOQA: D102
	if option in self.subnegotiation_map:
		self.subnegotiation_map[option](data)
	else:
		self.on_unhandled_subnegotiation(option, data)

Method on_unhandled_command(self, command, option)

Called for commands for which no handler is installed.

Parameters:

Name Type Description Default
command bytes

The first byte in a 1 or 2 byte negotiation sequence.

required
option bytes | None

The second byte in a 2 byte negotiation sequence or None.

required
Source code in mudproto/telnet.py
def on_unhandled_command(self, command: bytes, option: bytes | None) -> None:  # NOQA: D102
	return super().on_unhandled_command(command, option)  # type: ignore[safe-super]

Method on_unhandled_subnegotiation(self, option, data)

Called for subnegotiations for which no handler is installed.

Parameters:

Name Type Description Default
option bytes

The subnegotiation option.

required
data bytes

The payload.

required
Source code in mudproto/telnet.py
def on_unhandled_subnegotiation(self, option: bytes, data: bytes) -> None:  # NOQA: D102
	return super().on_unhandled_subnegotiation(option, data)  # type: ignore[safe-super]

Method on_will(self, option)

Called when an IAC + WILL + option is received.

Parameters:

Name Type Description Default
option bytes | None

The received option.

required
Source code in mudproto/telnet.py
def on_will(self, option: bytes | None) -> None:
	"""
	Called when an IAC + WILL + option is received.

	Args:
		option: The received option.
	"""  # NOQA: DOC501
	if option is None:
		raise AssertionError("Option must not be None in this context.")
	state = self.get_option_state(option)
	if not state.him.enabled and not state.him.negotiating:
		# Peer is unilaterally offering to enable an option.
		if self.on_enable_remote(option):
			state.him.enabled = True
			self._do(option)
			self.on_option_enabled(option)
		else:
			self._dont(option)
	elif not state.him.enabled and state.him.negotiating:
		# Peer agreed to enable an option in response to our request.
		state.him.enabled = True
		state.him.negotiating = False
		if not self.on_enable_remote(option):
			raise AssertionError(f"enableRemote must return True in this context (for option {option!r})")
		self.on_option_enabled(option)
	elif state.him.enabled and not state.him.negotiating:
		# Peer is unilaterally offering to enable an already-enabled option.
		# Ignore this.
		pass
	elif state.him.enabled and state.him.negotiating:
		# This is a bogus state.  It is here for completeness.  It will
		# never be entered.
		raise AssertionError(
			"him.enabled and him.negotiating cannot be True at the same time. "
			+ f"state: {state!r}, option: {option!r}"
		)

Method on_wont(self, option)

Called when an IAC + WONT + option is received.

Parameters:

Name Type Description Default
option bytes | None

The received option.

required
Source code in mudproto/telnet.py
def on_wont(self, option: bytes | None) -> None:
	"""
	Called when an IAC + WONT + option is received.

	Args:
		option: The received option.
	"""  # NOQA: DOC501
	if option is None:
		raise AssertionError("Option must not be None in this context.")
	state = self.get_option_state(option)
	if not state.him.enabled and not state.him.negotiating:
		# Peer is unilaterally demanding that an already-disabled option be/remain disabled.
		# Ignore this, although we could record it and refuse subsequent enable attempts
		# from our side, peer could refuse them again, so we won't.
		pass
	elif not state.him.enabled and state.him.negotiating:
		# Peer refused to enable an option in response to our request.
		state.him.negotiating = False
		logger.debug(
			f"Peer refuses to enable option {DESCRIPTIONS.get(option, repr(option))} "
			+ "in response to our request."
		)
	elif state.him.enabled and not state.him.negotiating:
		# Peer is unilaterally demanding that an option be disabled.
		state.him.enabled = False
		self.on_disable_remote(option)
		self._dont(option)
	elif state.him.enabled and state.him.negotiating:
		# Peer agreed to disable an option at our request.
		state.him.enabled = False
		state.him.negotiating = False
		self.on_disable_remote(option)

Method request_negotiation(self, option, data)

Performs a Telnet sub-negotiation for the given option.

Parameters:

Name Type Description Default
option bytes

The option we are negotiating.

required
data bytes

The data we are sending in the body of the negotiation.

required
Source code in mudproto/telnet.py
def request_negotiation(self, option: bytes, data: bytes) -> None:
	"""
	Performs a Telnet sub-negotiation for the given option.

	Args:
		option: The option we are negotiating.
		data: The data we are sending in the body of the negotiation.
	"""
	self.write(IAC + SB + option + escape_iac(data) + IAC + SE)

Method will(self, option)

Tells peer we would like to enable a Telnet option.

Parameters:

Name Type Description Default
option bytes

The option we wish to enable.

required
Source code in mudproto/telnet.py
def will(self, option: bytes) -> None:
	"""
	Tells peer we would like to enable a Telnet option.

	Args:
		option: The option we wish to enable.
	"""
	state = self.get_option_state(option)
	if state.us.negotiating or state.him.negotiating:
		logger.warning(
			f"We are offering to enable option {option!r}, but the option is "
			+ f"already being negotiated by {'us' if state.us.negotiating else 'peer'}."
		)
	elif state.us.enabled:
		logger.warning(f"Attempting to enable an already enabled option {option!r}.")
	else:
		state.us.negotiating = True
		self._will(option)

Method wont(self, option)

Tells peer we no longer wish to enable a Telnet option.

Parameters:

Name Type Description Default
option bytes

The option we wish to disable.

required
Source code in mudproto/telnet.py
def wont(self, option: bytes) -> None:
	"""
	Tells peer we no longer wish to enable a Telnet option.

	Args:
		option: The option we wish to disable.
	"""
	state = self.get_option_state(option)
	if state.us.negotiating or state.him.negotiating:
		logger.warning(
			f"We are refusing to enable option {option!r}, but the option is "
			+ f"already being negotiated by {'us' if state.us.negotiating else 'peer'}."
		)
	elif not state.us.enabled:
		logger.warning(f"Attempting to disable an already disabled option {option!r}.")
	else:
		state.us.negotiating = True
		self._wont(option)

Method write(self, data) inherited

Writes data to peer.

Parameters:

Name Type Description Default
data bytes

The bytes to be written.

required
Source code in mudproto/telnet.py
def write(self, data: bytes) -> None:
	"""
	Writes data to peer.

	Args:
		data: The bytes to be written.
	"""
	self._writer(data)

Class TelnetState(Enum)

Valid states for the state machine.

Source code in mudproto/telnet.py
class TelnetState(Enum):
	"""Valid states for the state machine."""

	DATA = auto()
	COMMAND = auto()
	NEWLINE = auto()
	NEGOTIATION = auto()
	SUBNEGOTIATION = auto()
	SUBNEGOTIATION_ESCAPED = auto()

Function escape_iac(data)

Escapes IAC bytes of a bytes-like object.

Parameters:

Name Type Description Default
data bytes

The data to be escaped.

required

Returns:

Type Description
bytes

The data with IAC bytes escaped.

Source code in mudproto/telnet.py
def escape_iac(data: bytes) -> bytes:
	"""
	Escapes IAC bytes of a bytes-like object.

	Args:
		data: The data to be escaped.

	Returns:
		The data with IAC bytes escaped.
	"""
	return data.replace(IAC, IAC_IAC)