Module mudproto.telnet
¶
Telnet protocol.
Class TelnetError(Exception)
¶
Implements the base class for Telnet exceptions.
Source code in mudproto/telnet.py
class TelnetError(Exception):
"""Implements the base class for Telnet exceptions."""
Class TelnetInterface(ConnectionInterface)
¶
Defines the interface for the Telnet protocol.
Source code in mudproto/telnet.py
class TelnetInterface(ConnectionInterface):
"""Defines the interface for the Telnet protocol."""
command_map: TelnetCommandMapType
"""A mapping of bytes to callables."""
subnegotiation_map: TelnetSubnegotiationMapType
"""A mapping of bytes to callables."""
@abstractmethod
def will(self, option: bytes) -> None:
"""
Indicates our willingness to enable an option.
Args:
option: The option to accept.
"""
@abstractmethod
def wont(self, option: bytes) -> None:
"""
Indicates we are not willing to enable an option.
Args:
option: The option to reject.
"""
@abstractmethod
def do(self, option: bytes) -> None:
"""
Requests that the peer enable an option.
Args:
option: The option to enable.
"""
@abstractmethod
def dont(self, option: bytes) -> None:
"""
Requests that the peer disable an option.
Args:
option: The option to disable.
"""
@abstractmethod
def get_option_state(self, option: bytes) -> _OptionState:
"""
Gets the state of a Telnet option.
Args:
option: The option to get state.
Returns:
An object containing the option state.
"""
@abstractmethod
def request_negotiation(self, option: bytes, data: bytes) -> None:
"""
Sends a subnegotiation message to the peer.
Args:
option: The subnegotiation option.
data: The payload.
"""
@abstractmethod
def on_command(self, command: bytes, option: bytes | None) -> None:
"""
Called when a 1 or 2 byte command is received.
Args:
command: The first byte in a 1 or 2 byte negotiation sequence.
option: The second byte in a 2 byte negotiation sequence or None.
"""
@abstractmethod
def on_subnegotiation(self, option: bytes, data: bytes) -> None:
"""
Called when a subnegotiation is received.
Args:
option: The subnegotiation option.
data: The payload.
"""
@abstractmethod
def on_unhandled_command(self, command: bytes, option: bytes | None) -> None:
"""
Called for commands for which no handler is installed.
Args:
command: The first byte in a 1 or 2 byte negotiation sequence.
option: The second byte in a 2 byte negotiation sequence or None.
"""
@abstractmethod
def on_unhandled_subnegotiation(self, option: bytes, data: bytes) -> None:
"""
Called for subnegotiations for which no handler is installed.
Args:
option: The subnegotiation option.
data: The payload.
"""
@abstractmethod
def on_enable_local(self, option: bytes) -> bool:
"""
Called to accept or reject the request for us to manage the option.
Args:
option: The option that peer requests us to handle.
Returns:
True if we will handle the option, False otherwise.
"""
return False # Reject all options by default.
@abstractmethod
def on_disable_local(self, option: bytes) -> None:
"""
Disables a locally managed option.
This method is called before we disable a
locally enabled option, in order to perform any necessary cleanup.
Note:
If on_enable_local is overridden, this method must be overridden as well.
Args:
option: The option being disabled.
"""
raise NotImplementedError(f"Don't know how to disable local Telnet option {option!r}")
@abstractmethod
def on_enable_remote(self, option: bytes) -> bool:
"""
Called to accept or reject the request for peer to manage the option.
Args:
option: The option that peer wants to handle.
Returns:
True if we will allow peer to handle the option, False otherwise.
"""
return False # Reject all options by default.
@abstractmethod
def on_disable_remote(self, option: bytes) -> None:
"""
Disables a remotely managed option.
This method is called when peer disables a remotely enabled option,
in order to perform any necessary cleanup on our end.
Note:
If on_enable_remote is overridden, this method must be overridden as well.
Args:
option: The option being disabled.
"""
raise NotImplementedError(f"Don't know how to disable remote Telnet option {option!r}")
@abstractmethod
def on_option_enabled(self, option: bytes) -> None:
"""
Called after an option has been fully enabled.
Args:
option: The option that has been enabled.
"""
Attribute is_client: bool
inherited
property
readonly
¶
True if acting as a client, False otherwise.
Attribute is_server: bool
inherited
property
readonly
¶
True if acting as a server, False otherwise.
Method do(self, option)
¶
Requests that the peer enable an option.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The option to enable. |
required |
Source code in mudproto/telnet.py
@abstractmethod
def do(self, option: bytes) -> None:
"""
Requests that the peer enable an option.
Args:
option: The option to enable.
"""
Method dont(self, option)
¶
Requests that the peer disable an option.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The option to disable. |
required |
Source code in mudproto/telnet.py
@abstractmethod
def dont(self, option: bytes) -> None:
"""
Requests that the peer disable an option.
Args:
option: The option to disable.
"""
Method get_option_state(self, option)
¶
Gets the state of a Telnet option.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The option to get state. |
required |
Returns:
| Type | Description |
|---|---|
_OptionState |
An object containing the option state. |
Source code in mudproto/telnet.py
@abstractmethod
def get_option_state(self, option: bytes) -> _OptionState:
"""
Gets the state of a Telnet option.
Args:
option: The option to get state.
Returns:
An object containing the option state.
"""
Method on_command(self, command, option)
¶
Called when a 1 or 2 byte command is received.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
command |
bytes |
The first byte in a 1 or 2 byte negotiation sequence. |
required |
option |
bytes | None |
The second byte in a 2 byte negotiation sequence or None. |
required |
Source code in mudproto/telnet.py
@abstractmethod
def on_command(self, command: bytes, option: bytes | None) -> None:
"""
Called when a 1 or 2 byte command is received.
Args:
command: The first byte in a 1 or 2 byte negotiation sequence.
option: The second byte in a 2 byte negotiation sequence or None.
"""
Method on_connection_lost(self)
inherited
¶
Called by disconnect when a connection to peer has been lost.
Source code in mudproto/telnet.py
@abstractmethod
def on_connection_lost(self) -> None:
"""Called by `disconnect` when a connection to peer has been lost."""
Method on_connection_made(self)
inherited
¶
Called by connect when a connection to peer has been established.
Source code in mudproto/telnet.py
@abstractmethod
def on_connection_made(self) -> None:
"""Called by `connect` when a connection to peer has been established."""
Method on_data_received(self, data)
inherited
¶
Called by parse when data is received.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data |
bytes |
The received data. |
required |
Source code in mudproto/telnet.py
@abstractmethod
def on_data_received(self, data: bytes) -> None:
"""
Called by `parse` when data is received.
Args:
data: The received data.
"""
self._receiver(data)
Method on_disable_local(self, option)
¶
Disables a locally managed option.
This method is called before we disable a locally enabled option, in order to perform any necessary cleanup.
Note
If on_enable_local is overridden, this method must be overridden as well.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The option being disabled. |
required |
Source code in mudproto/telnet.py
@abstractmethod
def on_disable_local(self, option: bytes) -> None:
"""
Disables a locally managed option.
This method is called before we disable a
locally enabled option, in order to perform any necessary cleanup.
Note:
If on_enable_local is overridden, this method must be overridden as well.
Args:
option: The option being disabled.
"""
raise NotImplementedError(f"Don't know how to disable local Telnet option {option!r}")
Method on_disable_remote(self, option)
¶
Disables a remotely managed option.
This method is called when peer disables a remotely enabled option, in order to perform any necessary cleanup on our end.
Note
If on_enable_remote is overridden, this method must be overridden as well.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The option being disabled. |
required |
Source code in mudproto/telnet.py
@abstractmethod
def on_disable_remote(self, option: bytes) -> None:
"""
Disables a remotely managed option.
This method is called when peer disables a remotely enabled option,
in order to perform any necessary cleanup on our end.
Note:
If on_enable_remote is overridden, this method must be overridden as well.
Args:
option: The option being disabled.
"""
raise NotImplementedError(f"Don't know how to disable remote Telnet option {option!r}")
Method on_enable_local(self, option)
¶
Called to accept or reject the request for us to manage the option.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The option that peer requests us to handle. |
required |
Returns:
| Type | Description |
|---|---|
bool |
True if we will handle the option, False otherwise. |
Source code in mudproto/telnet.py
@abstractmethod
def on_enable_local(self, option: bytes) -> bool:
"""
Called to accept or reject the request for us to manage the option.
Args:
option: The option that peer requests us to handle.
Returns:
True if we will handle the option, False otherwise.
"""
return False # Reject all options by default.
Method on_enable_remote(self, option)
¶
Called to accept or reject the request for peer to manage the option.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The option that peer wants to handle. |
required |
Returns:
| Type | Description |
|---|---|
bool |
True if we will allow peer to handle the option, False otherwise. |
Source code in mudproto/telnet.py
@abstractmethod
def on_enable_remote(self, option: bytes) -> bool:
"""
Called to accept or reject the request for peer to manage the option.
Args:
option: The option that peer wants to handle.
Returns:
True if we will allow peer to handle the option, False otherwise.
"""
return False # Reject all options by default.
Method on_option_enabled(self, option)
¶
Called after an option has been fully enabled.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The option that has been enabled. |
required |
Source code in mudproto/telnet.py
@abstractmethod
def on_option_enabled(self, option: bytes) -> None:
"""
Called after an option has been fully enabled.
Args:
option: The option that has been enabled.
"""
Method on_subnegotiation(self, option, data)
¶
Called when a subnegotiation is received.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The subnegotiation option. |
required |
data |
bytes |
The payload. |
required |
Source code in mudproto/telnet.py
@abstractmethod
def on_subnegotiation(self, option: bytes, data: bytes) -> None:
"""
Called when a subnegotiation is received.
Args:
option: The subnegotiation option.
data: The payload.
"""
Method on_unhandled_command(self, command, option)
¶
Called for commands for which no handler is installed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
command |
bytes |
The first byte in a 1 or 2 byte negotiation sequence. |
required |
option |
bytes | None |
The second byte in a 2 byte negotiation sequence or None. |
required |
Source code in mudproto/telnet.py
@abstractmethod
def on_unhandled_command(self, command: bytes, option: bytes | None) -> None:
"""
Called for commands for which no handler is installed.
Args:
command: The first byte in a 1 or 2 byte negotiation sequence.
option: The second byte in a 2 byte negotiation sequence or None.
"""
Method on_unhandled_subnegotiation(self, option, data)
¶
Called for subnegotiations for which no handler is installed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The subnegotiation option. |
required |
data |
bytes |
The payload. |
required |
Source code in mudproto/telnet.py
@abstractmethod
def on_unhandled_subnegotiation(self, option: bytes, data: bytes) -> None:
"""
Called for subnegotiations for which no handler is installed.
Args:
option: The subnegotiation option.
data: The payload.
"""
Method request_negotiation(self, option, data)
¶
Sends a subnegotiation message to the peer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The subnegotiation option. |
required |
data |
bytes |
The payload. |
required |
Source code in mudproto/telnet.py
@abstractmethod
def request_negotiation(self, option: bytes, data: bytes) -> None:
"""
Sends a subnegotiation message to the peer.
Args:
option: The subnegotiation option.
data: The payload.
"""
Method will(self, option)
¶
Indicates our willingness to enable an option.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The option to accept. |
required |
Source code in mudproto/telnet.py
@abstractmethod
def will(self, option: bytes) -> None:
"""
Indicates our willingness to enable an option.
Args:
option: The option to accept.
"""
Method wont(self, option)
¶
Indicates we are not willing to enable an option.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The option to reject. |
required |
Source code in mudproto/telnet.py
@abstractmethod
def wont(self, option: bytes) -> None:
"""
Indicates we are not willing to enable an option.
Args:
option: The option to reject.
"""
Method write(self, data)
inherited
¶
Writes data to peer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data |
bytes |
The bytes to be written. |
required |
Source code in mudproto/telnet.py
def write(self, data: bytes) -> None:
"""
Writes data to peer.
Args:
data: The bytes to be written.
"""
self._writer(data)
Class TelnetProtocol(TelnetInterface)
¶
Implements the Telnet protocol.
Source code in mudproto/telnet.py
class TelnetProtocol(TelnetInterface): # NOQA: PLR0904
"""Implements the Telnet protocol."""
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""
Defines the constructor.
Args:
*args: Positional arguments to be passed to the parent constructor.
**kwargs: Key-word only arguments to be passed to the parent constructor.
"""
super().__init__(*args, **kwargs)
self.state: TelnetState = TelnetState.DATA
"""The state of the state machine."""
self._options: dict[bytes, _OptionState] = {}
"""A mapping of option bytes to their current state."""
# When a Telnet command is received, the command byte,
# the first byte after IAC, is looked up in the commandMap dictionary.
# If a callable is found, it is invoked with the argument of the command,
# or None if the command takes no argument. Values should be added to
# this dictionary if commands wish to be handled. By default,
# only WILL, WONT, DO, and DONT are handled. These should not
# be overridden, as this class handles them correctly and
# provides an API for interacting with them.
self.command_map: TelnetCommandMapType = {
WILL: self.on_will,
WONT: self.on_wont,
DO: self.on_do,
DONT: self.on_dont,
}
# When a subnegotiation command is received, the option byte, the
# first byte after SB, is looked up in the subnegotiationMap dictionary. If
# a callable is found, it is invoked with the argument of the
# subnegotiation. Values should be added to this dictionary if
# subnegotiations are to be handled. By default, no values are
# handled.
self.subnegotiation_map: TelnetSubnegotiationMapType = {}
def _do(self, option: bytes) -> None:
"""
Sends IAC DO option to the peer.
Args:
option: The option to send.
"""
logger.debug(f"Send to peer: IAC DO {DESCRIPTIONS.get(option, repr(option))}")
self.write(IAC + DO + option)
def _dont(self, option: bytes) -> None:
"""
Sends IAC DONT option to the peer.
Args:
option: The option to send.
"""
logger.debug(f"Send to peer: IAC DONT {DESCRIPTIONS.get(option, repr(option))}")
self.write(IAC + DONT + option)
def _will(self, option: bytes) -> None:
"""
Sends IAC WILL option to the peer.
Args:
option: The option to send.
"""
logger.debug(f"Send to peer: IAC WILL {DESCRIPTIONS.get(option, repr(option))}")
self.write(IAC + WILL + option)
def _wont(self, option: bytes) -> None:
"""
Sends IAC WONT option to the peer.
Args:
option: The option to send.
"""
logger.debug(f"Send to peer: IAC WONT {DESCRIPTIONS.get(option, repr(option))}")
self.write(IAC + WONT + option)
def will(self, option: bytes) -> None:
"""
Tells peer we would like to enable a Telnet option.
Args:
option: The option we wish to enable.
"""
state = self.get_option_state(option)
if state.us.negotiating or state.him.negotiating:
logger.warning(
f"We are offering to enable option {option!r}, but the option is "
+ f"already being negotiated by {'us' if state.us.negotiating else 'peer'}."
)
elif state.us.enabled:
logger.warning(f"Attempting to enable an already enabled option {option!r}.")
else:
state.us.negotiating = True
self._will(option)
def wont(self, option: bytes) -> None:
"""
Tells peer we no longer wish to enable a Telnet option.
Args:
option: The option we wish to disable.
"""
state = self.get_option_state(option)
if state.us.negotiating or state.him.negotiating:
logger.warning(
f"We are refusing to enable option {option!r}, but the option is "
+ f"already being negotiated by {'us' if state.us.negotiating else 'peer'}."
)
elif not state.us.enabled:
logger.warning(f"Attempting to disable an already disabled option {option!r}.")
else:
state.us.negotiating = True
self._wont(option)
def do(self, option: bytes) -> None:
"""
Tells peer we would like him to enable a Telnet option.
Args:
option: The option we wish peer to enable.
"""
state = self.get_option_state(option)
if state.us.negotiating or state.him.negotiating:
logger.warning(
f"We are requesting that peer enable option {option!r}, but the option is "
+ f"already being negotiated by {'us' if state.us.negotiating else 'peer'}."
)
elif state.him.enabled:
logger.warning(f"Requesting that peer enable an already enabled option {option!r}.")
else:
state.him.negotiating = True
self._do(option)
def dont(self, option: bytes) -> None:
"""
Tells peer we would like him to disable a Telnet option.
Args:
option: The option we wish to disable.
"""
state = self.get_option_state(option)
if state.us.negotiating or state.him.negotiating:
logger.warning(
f"We are requesting that peer disable option {option!r}, but the option is "
+ f"already being negotiated by {'us' if state.us.negotiating else 'peer'}."
)
elif not state.him.enabled:
logger.warning(f"Requesting that peer disable an already disabled option {option!r}.")
else:
state.him.negotiating = True
self._dont(option)
def get_option_state(self, option: bytes) -> _OptionState:
"""
Retrieves the state of an option.
Args:
option: The option we wish to get the state of.
Returns:
The option state.
"""
if option not in self._options:
self._options[option] = _OptionState()
return self._options[option]
def request_negotiation(self, option: bytes, data: bytes) -> None:
"""
Performs a Telnet sub-negotiation for the given option.
Args:
option: The option we are negotiating.
data: The data we are sending in the body of the negotiation.
"""
self.write(IAC + SB + option + escape_iac(data) + IAC + SE)
def on_connection_made(self) -> None: # NOQA: D102
return super().on_connection_made() # type: ignore[safe-super]
def on_connection_lost(self) -> None: # NOQA: D102
return super().on_connection_lost() # type: ignore[safe-super]
def on_data_received(self, data: bytes) -> None: # NOQA: C901, D102, PLR0912, PLR0915
app_data_buffer: bytearray = bytearray()
while data:
if self.state is TelnetState.DATA:
app_data, separator, data = data.partition(IAC)
if separator:
self.state = TelnetState.COMMAND
elif app_data.endswith(CR):
self.state = TelnetState.NEWLINE
app_data = app_data[:-1]
app_data_buffer.extend(app_data.replace(CR_LF, LF).replace(CR_NULL, CR))
continue
byte, data = data[:1], data[1:]
if self.state is TelnetState.COMMAND:
if byte == IAC:
# Escaped IAC.
app_data_buffer.extend(byte)
self.state = TelnetState.DATA
elif byte == SE:
self.state = TelnetState.DATA
logger.warning("IAC SE received outside of subnegotiation.")
elif byte == SB:
self.state = TelnetState.SUBNEGOTIATION
self._commands: bytearray = bytearray()
elif byte in COMMAND_BYTES:
self.state = TelnetState.DATA
if app_data_buffer:
super().on_data_received(bytes(app_data_buffer))
app_data_buffer.clear()
logger.debug(f"Received from peer: IAC {DESCRIPTIONS[byte]}")
self.on_command(byte, None)
elif byte in NEGOTIATION_BYTES:
self.state = TelnetState.NEGOTIATION
self._command = byte
else:
self.state = TelnetState.DATA
logger.warning(f"Unknown Telnet command received {byte!r}.")
elif self.state is TelnetState.NEGOTIATION:
self.state = TelnetState.DATA
command = self._command
del self._command
if app_data_buffer:
super().on_data_received(bytes(app_data_buffer))
app_data_buffer.clear()
logger.debug(
f"Received from peer: IAC {DESCRIPTIONS[command]} {DESCRIPTIONS.get(byte, repr(byte))}"
)
self.on_command(command, byte)
elif self.state is TelnetState.NEWLINE:
self.state = TelnetState.DATA
if byte == LF:
app_data_buffer.extend(byte)
elif byte == NULL:
app_data_buffer.extend(CR)
elif byte == IAC:
# IAC isn't really allowed after CR, according to the
# RFC, but handling it this way is less surprising than
# delivering the IAC to the app as application data.
# The purpose of the restriction is to allow terminals
# to unambiguously interpret the behavior of the CR
# after reading only one more byte. CR + LF is supposed
# to mean one thing, cursor to next line, first column,
# CR + NUL another, cursor to first column. Absent the
# NUL, it still makes sense to interpret this as CR and
# then apply all the usual interpretation to the IAC.
app_data_buffer.extend(CR)
self.state = TelnetState.COMMAND
else:
app_data_buffer.extend(CR + byte)
elif self.state is TelnetState.SUBNEGOTIATION:
if byte == IAC:
self.state = TelnetState.SUBNEGOTIATION_ESCAPED
else:
self._commands.extend(byte)
elif self.state is TelnetState.SUBNEGOTIATION_ESCAPED:
if byte == SE:
self.state = TelnetState.DATA
commands = bytes(self._commands)
del self._commands
if app_data_buffer:
super().on_data_received(bytes(app_data_buffer))
app_data_buffer.clear()
option, commands = commands[:1], commands[1:]
logger.debug(
f"Received from peer: IAC SB {DESCRIPTIONS.get(option, repr(option))} "
+ f"{commands!r} IAC SE"
)
self.on_subnegotiation(option, commands)
else:
self.state = TelnetState.SUBNEGOTIATION
self._commands.extend(byte)
if app_data_buffer:
super().on_data_received(bytes(app_data_buffer))
def on_command(self, command: bytes, option: bytes | None) -> None: # NOQA: D102
if command in self.command_map:
self.command_map[command](option)
else:
self.on_unhandled_command(command, option)
def on_subnegotiation(self, option: bytes, data: bytes) -> None: # NOQA: D102
if option in self.subnegotiation_map:
self.subnegotiation_map[option](data)
else:
self.on_unhandled_subnegotiation(option, data)
def on_will(self, option: bytes | None) -> None:
"""
Called when an IAC + WILL + option is received.
Args:
option: The received option.
""" # NOQA: DOC501
if option is None:
raise AssertionError("Option must not be None in this context.")
state = self.get_option_state(option)
if not state.him.enabled and not state.him.negotiating:
# Peer is unilaterally offering to enable an option.
if self.on_enable_remote(option):
state.him.enabled = True
self._do(option)
self.on_option_enabled(option)
else:
self._dont(option)
elif not state.him.enabled and state.him.negotiating:
# Peer agreed to enable an option in response to our request.
state.him.enabled = True
state.him.negotiating = False
if not self.on_enable_remote(option):
raise AssertionError(f"enableRemote must return True in this context (for option {option!r})")
self.on_option_enabled(option)
elif state.him.enabled and not state.him.negotiating:
# Peer is unilaterally offering to enable an already-enabled option.
# Ignore this.
pass
elif state.him.enabled and state.him.negotiating:
# This is a bogus state. It is here for completeness. It will
# never be entered.
raise AssertionError(
"him.enabled and him.negotiating cannot be True at the same time. "
+ f"state: {state!r}, option: {option!r}"
)
def on_wont(self, option: bytes | None) -> None:
"""
Called when an IAC + WONT + option is received.
Args:
option: The received option.
""" # NOQA: DOC501
if option is None:
raise AssertionError("Option must not be None in this context.")
state = self.get_option_state(option)
if not state.him.enabled and not state.him.negotiating:
# Peer is unilaterally demanding that an already-disabled option be/remain disabled.
# Ignore this, although we could record it and refuse subsequent enable attempts
# from our side, peer could refuse them again, so we won't.
pass
elif not state.him.enabled and state.him.negotiating:
# Peer refused to enable an option in response to our request.
state.him.negotiating = False
logger.debug(
f"Peer refuses to enable option {DESCRIPTIONS.get(option, repr(option))} "
+ "in response to our request."
)
elif state.him.enabled and not state.him.negotiating:
# Peer is unilaterally demanding that an option be disabled.
state.him.enabled = False
self.on_disable_remote(option)
self._dont(option)
elif state.him.enabled and state.him.negotiating:
# Peer agreed to disable an option at our request.
state.him.enabled = False
state.him.negotiating = False
self.on_disable_remote(option)
def on_do(self, option: bytes | None) -> None:
"""
Called when an IAC + DO + option is received.
Args:
option: The received option.
""" # NOQA: DOC501
if option is None:
raise AssertionError("Option must not be None in this context.")
state = self.get_option_state(option)
if not state.us.enabled and not state.us.negotiating:
# Peer is unilaterally requesting that we enable an option.
if self.on_enable_local(option):
state.us.enabled = True
self._will(option)
self.on_option_enabled(option)
else:
self._wont(option)
elif not state.us.enabled and state.us.negotiating:
# Peer agreed to allow us to enable an option at our request.
state.us.enabled = True
state.us.negotiating = False
self.on_enable_local(option)
self.on_option_enabled(option)
elif state.us.enabled and not state.us.negotiating:
# Peer is unilaterally requesting us to enable an already-enabled option.
# Ignore this.
pass
elif state.us.enabled and state.us.negotiating:
# This is a bogus state. It is here for completeness. It will never be
# entered.
raise AssertionError(
"us.enabled and us.negotiating cannot be True at the same time. "
+ f"state: {state!r}, option: {option!r}"
)
def on_dont(self, option: bytes | None) -> None:
"""
Called when an IAC + DONT + option is received.
Args:
option: The received option.
""" # NOQA: DOC501
if option is None:
raise AssertionError("Option must not be None in this context.")
state = self.get_option_state(option)
if not state.us.enabled and not state.us.negotiating:
# Peer is unilaterally demanding us to disable an already-disabled option.
# Ignore this.
pass
elif not state.us.enabled and state.us.negotiating:
# Offered option was refused.
state.us.negotiating = False
logger.debug(f"Peer rejects our offer to enable option {DESCRIPTIONS.get(option, repr(option))}.")
elif state.us.enabled and not state.us.negotiating:
# Peer is unilaterally demanding we disable an option.
state.us.enabled = False
self.on_disable_local(option)
self._wont(option)
elif state.us.enabled and state.us.negotiating:
# Peer acknowledged our notice that we will disable an option.
state.us.enabled = False
state.us.negotiating = False
self.on_disable_local(option)
def on_unhandled_command(self, command: bytes, option: bytes | None) -> None: # NOQA: D102
return super().on_unhandled_command(command, option) # type: ignore[safe-super]
def on_unhandled_subnegotiation(self, option: bytes, data: bytes) -> None: # NOQA: D102
return super().on_unhandled_subnegotiation(option, data) # type: ignore[safe-super]
def on_enable_local(self, option: bytes) -> bool: # NOQA: D102
return super().on_enable_local(option)
def on_disable_local(self, option: bytes) -> None: # NOQA: D102
return super().on_disable_local(option) # type: ignore[safe-super]
def on_enable_remote(self, option: bytes) -> bool: # NOQA: D102
return super().on_enable_remote(option)
def on_disable_remote(self, option: bytes) -> None: # NOQA: D102
return super().on_disable_remote(option) # type: ignore[safe-super]
def on_option_enabled(self, option: bytes) -> None: # NOQA: D102
return super().on_option_enabled(option) # type: ignore[safe-super]
Attribute is_client: bool
inherited
property
readonly
¶
True if acting as a client, False otherwise.
Attribute is_server: bool
inherited
property
readonly
¶
True if acting as a server, False otherwise.
Method __init__(self, *args, **kwargs)
special
¶
Defines the constructor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*args |
Any |
Positional arguments to be passed to the parent constructor. |
() |
**kwargs |
Any |
Key-word only arguments to be passed to the parent constructor. |
{} |
Source code in mudproto/telnet.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""
Defines the constructor.
Args:
*args: Positional arguments to be passed to the parent constructor.
**kwargs: Key-word only arguments to be passed to the parent constructor.
"""
super().__init__(*args, **kwargs)
self.state: TelnetState = TelnetState.DATA
"""The state of the state machine."""
self._options: dict[bytes, _OptionState] = {}
"""A mapping of option bytes to their current state."""
# When a Telnet command is received, the command byte,
# the first byte after IAC, is looked up in the commandMap dictionary.
# If a callable is found, it is invoked with the argument of the command,
# or None if the command takes no argument. Values should be added to
# this dictionary if commands wish to be handled. By default,
# only WILL, WONT, DO, and DONT are handled. These should not
# be overridden, as this class handles them correctly and
# provides an API for interacting with them.
self.command_map: TelnetCommandMapType = {
WILL: self.on_will,
WONT: self.on_wont,
DO: self.on_do,
DONT: self.on_dont,
}
# When a subnegotiation command is received, the option byte, the
# first byte after SB, is looked up in the subnegotiationMap dictionary. If
# a callable is found, it is invoked with the argument of the
# subnegotiation. Values should be added to this dictionary if
# subnegotiations are to be handled. By default, no values are
# handled.
self.subnegotiation_map: TelnetSubnegotiationMapType = {}
Method do(self, option)
¶
Tells peer we would like him to enable a Telnet option.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The option we wish peer to enable. |
required |
Source code in mudproto/telnet.py
def do(self, option: bytes) -> None:
"""
Tells peer we would like him to enable a Telnet option.
Args:
option: The option we wish peer to enable.
"""
state = self.get_option_state(option)
if state.us.negotiating or state.him.negotiating:
logger.warning(
f"We are requesting that peer enable option {option!r}, but the option is "
+ f"already being negotiated by {'us' if state.us.negotiating else 'peer'}."
)
elif state.him.enabled:
logger.warning(f"Requesting that peer enable an already enabled option {option!r}.")
else:
state.him.negotiating = True
self._do(option)
Method dont(self, option)
¶
Tells peer we would like him to disable a Telnet option.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The option we wish to disable. |
required |
Source code in mudproto/telnet.py
def dont(self, option: bytes) -> None:
"""
Tells peer we would like him to disable a Telnet option.
Args:
option: The option we wish to disable.
"""
state = self.get_option_state(option)
if state.us.negotiating or state.him.negotiating:
logger.warning(
f"We are requesting that peer disable option {option!r}, but the option is "
+ f"already being negotiated by {'us' if state.us.negotiating else 'peer'}."
)
elif not state.him.enabled:
logger.warning(f"Requesting that peer disable an already disabled option {option!r}.")
else:
state.him.negotiating = True
self._dont(option)
Method get_option_state(self, option)
¶
Retrieves the state of an option.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The option we wish to get the state of. |
required |
Returns:
| Type | Description |
|---|---|
_OptionState |
The option state. |
Source code in mudproto/telnet.py
def get_option_state(self, option: bytes) -> _OptionState:
"""
Retrieves the state of an option.
Args:
option: The option we wish to get the state of.
Returns:
The option state.
"""
if option not in self._options:
self._options[option] = _OptionState()
return self._options[option]
Method on_command(self, command, option)
¶
Called when a 1 or 2 byte command is received.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
command |
bytes |
The first byte in a 1 or 2 byte negotiation sequence. |
required |
option |
bytes | None |
The second byte in a 2 byte negotiation sequence or None. |
required |
Source code in mudproto/telnet.py
def on_command(self, command: bytes, option: bytes | None) -> None: # NOQA: D102
if command in self.command_map:
self.command_map[command](option)
else:
self.on_unhandled_command(command, option)
Method on_connection_lost(self)
¶
Called by disconnect when a connection to peer has been lost.
Source code in mudproto/telnet.py
def on_connection_lost(self) -> None: # NOQA: D102
return super().on_connection_lost() # type: ignore[safe-super]
Method on_connection_made(self)
¶
Called by connect when a connection to peer has been established.
Source code in mudproto/telnet.py
def on_connection_made(self) -> None: # NOQA: D102
return super().on_connection_made() # type: ignore[safe-super]
Method on_data_received(self, data)
¶
Called by parse when data is received.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data |
bytes |
The received data. |
required |
Source code in mudproto/telnet.py
def on_data_received(self, data: bytes) -> None: # NOQA: C901, D102, PLR0912, PLR0915
app_data_buffer: bytearray = bytearray()
while data:
if self.state is TelnetState.DATA:
app_data, separator, data = data.partition(IAC)
if separator:
self.state = TelnetState.COMMAND
elif app_data.endswith(CR):
self.state = TelnetState.NEWLINE
app_data = app_data[:-1]
app_data_buffer.extend(app_data.replace(CR_LF, LF).replace(CR_NULL, CR))
continue
byte, data = data[:1], data[1:]
if self.state is TelnetState.COMMAND:
if byte == IAC:
# Escaped IAC.
app_data_buffer.extend(byte)
self.state = TelnetState.DATA
elif byte == SE:
self.state = TelnetState.DATA
logger.warning("IAC SE received outside of subnegotiation.")
elif byte == SB:
self.state = TelnetState.SUBNEGOTIATION
self._commands: bytearray = bytearray()
elif byte in COMMAND_BYTES:
self.state = TelnetState.DATA
if app_data_buffer:
super().on_data_received(bytes(app_data_buffer))
app_data_buffer.clear()
logger.debug(f"Received from peer: IAC {DESCRIPTIONS[byte]}")
self.on_command(byte, None)
elif byte in NEGOTIATION_BYTES:
self.state = TelnetState.NEGOTIATION
self._command = byte
else:
self.state = TelnetState.DATA
logger.warning(f"Unknown Telnet command received {byte!r}.")
elif self.state is TelnetState.NEGOTIATION:
self.state = TelnetState.DATA
command = self._command
del self._command
if app_data_buffer:
super().on_data_received(bytes(app_data_buffer))
app_data_buffer.clear()
logger.debug(
f"Received from peer: IAC {DESCRIPTIONS[command]} {DESCRIPTIONS.get(byte, repr(byte))}"
)
self.on_command(command, byte)
elif self.state is TelnetState.NEWLINE:
self.state = TelnetState.DATA
if byte == LF:
app_data_buffer.extend(byte)
elif byte == NULL:
app_data_buffer.extend(CR)
elif byte == IAC:
# IAC isn't really allowed after CR, according to the
# RFC, but handling it this way is less surprising than
# delivering the IAC to the app as application data.
# The purpose of the restriction is to allow terminals
# to unambiguously interpret the behavior of the CR
# after reading only one more byte. CR + LF is supposed
# to mean one thing, cursor to next line, first column,
# CR + NUL another, cursor to first column. Absent the
# NUL, it still makes sense to interpret this as CR and
# then apply all the usual interpretation to the IAC.
app_data_buffer.extend(CR)
self.state = TelnetState.COMMAND
else:
app_data_buffer.extend(CR + byte)
elif self.state is TelnetState.SUBNEGOTIATION:
if byte == IAC:
self.state = TelnetState.SUBNEGOTIATION_ESCAPED
else:
self._commands.extend(byte)
elif self.state is TelnetState.SUBNEGOTIATION_ESCAPED:
if byte == SE:
self.state = TelnetState.DATA
commands = bytes(self._commands)
del self._commands
if app_data_buffer:
super().on_data_received(bytes(app_data_buffer))
app_data_buffer.clear()
option, commands = commands[:1], commands[1:]
logger.debug(
f"Received from peer: IAC SB {DESCRIPTIONS.get(option, repr(option))} "
+ f"{commands!r} IAC SE"
)
self.on_subnegotiation(option, commands)
else:
self.state = TelnetState.SUBNEGOTIATION
self._commands.extend(byte)
if app_data_buffer:
super().on_data_received(bytes(app_data_buffer))
Method on_disable_local(self, option)
¶
Disables a locally managed option.
This method is called before we disable a locally enabled option, in order to perform any necessary cleanup.
Note
If on_enable_local is overridden, this method must be overridden as well.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The option being disabled. |
required |
Source code in mudproto/telnet.py
def on_disable_local(self, option: bytes) -> None: # NOQA: D102
return super().on_disable_local(option) # type: ignore[safe-super]
Method on_disable_remote(self, option)
¶
Disables a remotely managed option.
This method is called when peer disables a remotely enabled option, in order to perform any necessary cleanup on our end.
Note
If on_enable_remote is overridden, this method must be overridden as well.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The option being disabled. |
required |
Source code in mudproto/telnet.py
def on_disable_remote(self, option: bytes) -> None: # NOQA: D102
return super().on_disable_remote(option) # type: ignore[safe-super]
Method on_do(self, option)
¶
Called when an IAC + DO + option is received.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes | None |
The received option. |
required |
Source code in mudproto/telnet.py
def on_do(self, option: bytes | None) -> None:
"""
Called when an IAC + DO + option is received.
Args:
option: The received option.
""" # NOQA: DOC501
if option is None:
raise AssertionError("Option must not be None in this context.")
state = self.get_option_state(option)
if not state.us.enabled and not state.us.negotiating:
# Peer is unilaterally requesting that we enable an option.
if self.on_enable_local(option):
state.us.enabled = True
self._will(option)
self.on_option_enabled(option)
else:
self._wont(option)
elif not state.us.enabled and state.us.negotiating:
# Peer agreed to allow us to enable an option at our request.
state.us.enabled = True
state.us.negotiating = False
self.on_enable_local(option)
self.on_option_enabled(option)
elif state.us.enabled and not state.us.negotiating:
# Peer is unilaterally requesting us to enable an already-enabled option.
# Ignore this.
pass
elif state.us.enabled and state.us.negotiating:
# This is a bogus state. It is here for completeness. It will never be
# entered.
raise AssertionError(
"us.enabled and us.negotiating cannot be True at the same time. "
+ f"state: {state!r}, option: {option!r}"
)
Method on_dont(self, option)
¶
Called when an IAC + DONT + option is received.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes | None |
The received option. |
required |
Source code in mudproto/telnet.py
def on_dont(self, option: bytes | None) -> None:
"""
Called when an IAC + DONT + option is received.
Args:
option: The received option.
""" # NOQA: DOC501
if option is None:
raise AssertionError("Option must not be None in this context.")
state = self.get_option_state(option)
if not state.us.enabled and not state.us.negotiating:
# Peer is unilaterally demanding us to disable an already-disabled option.
# Ignore this.
pass
elif not state.us.enabled and state.us.negotiating:
# Offered option was refused.
state.us.negotiating = False
logger.debug(f"Peer rejects our offer to enable option {DESCRIPTIONS.get(option, repr(option))}.")
elif state.us.enabled and not state.us.negotiating:
# Peer is unilaterally demanding we disable an option.
state.us.enabled = False
self.on_disable_local(option)
self._wont(option)
elif state.us.enabled and state.us.negotiating:
# Peer acknowledged our notice that we will disable an option.
state.us.enabled = False
state.us.negotiating = False
self.on_disable_local(option)
Method on_enable_local(self, option)
¶
Called to accept or reject the request for us to manage the option.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The option that peer requests us to handle. |
required |
Returns:
| Type | Description |
|---|---|
bool |
True if we will handle the option, False otherwise. |
Source code in mudproto/telnet.py
def on_enable_local(self, option: bytes) -> bool: # NOQA: D102
return super().on_enable_local(option)
Method on_enable_remote(self, option)
¶
Called to accept or reject the request for peer to manage the option.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The option that peer wants to handle. |
required |
Returns:
| Type | Description |
|---|---|
bool |
True if we will allow peer to handle the option, False otherwise. |
Source code in mudproto/telnet.py
def on_enable_remote(self, option: bytes) -> bool: # NOQA: D102
return super().on_enable_remote(option)
Method on_option_enabled(self, option)
¶
Called after an option has been fully enabled.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The option that has been enabled. |
required |
Source code in mudproto/telnet.py
def on_option_enabled(self, option: bytes) -> None: # NOQA: D102
return super().on_option_enabled(option) # type: ignore[safe-super]
Method on_subnegotiation(self, option, data)
¶
Called when a subnegotiation is received.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The subnegotiation option. |
required |
data |
bytes |
The payload. |
required |
Source code in mudproto/telnet.py
def on_subnegotiation(self, option: bytes, data: bytes) -> None: # NOQA: D102
if option in self.subnegotiation_map:
self.subnegotiation_map[option](data)
else:
self.on_unhandled_subnegotiation(option, data)
Method on_unhandled_command(self, command, option)
¶
Called for commands for which no handler is installed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
command |
bytes |
The first byte in a 1 or 2 byte negotiation sequence. |
required |
option |
bytes | None |
The second byte in a 2 byte negotiation sequence or None. |
required |
Source code in mudproto/telnet.py
def on_unhandled_command(self, command: bytes, option: bytes | None) -> None: # NOQA: D102
return super().on_unhandled_command(command, option) # type: ignore[safe-super]
Method on_unhandled_subnegotiation(self, option, data)
¶
Called for subnegotiations for which no handler is installed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The subnegotiation option. |
required |
data |
bytes |
The payload. |
required |
Source code in mudproto/telnet.py
def on_unhandled_subnegotiation(self, option: bytes, data: bytes) -> None: # NOQA: D102
return super().on_unhandled_subnegotiation(option, data) # type: ignore[safe-super]
Method on_will(self, option)
¶
Called when an IAC + WILL + option is received.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes | None |
The received option. |
required |
Source code in mudproto/telnet.py
def on_will(self, option: bytes | None) -> None:
"""
Called when an IAC + WILL + option is received.
Args:
option: The received option.
""" # NOQA: DOC501
if option is None:
raise AssertionError("Option must not be None in this context.")
state = self.get_option_state(option)
if not state.him.enabled and not state.him.negotiating:
# Peer is unilaterally offering to enable an option.
if self.on_enable_remote(option):
state.him.enabled = True
self._do(option)
self.on_option_enabled(option)
else:
self._dont(option)
elif not state.him.enabled and state.him.negotiating:
# Peer agreed to enable an option in response to our request.
state.him.enabled = True
state.him.negotiating = False
if not self.on_enable_remote(option):
raise AssertionError(f"enableRemote must return True in this context (for option {option!r})")
self.on_option_enabled(option)
elif state.him.enabled and not state.him.negotiating:
# Peer is unilaterally offering to enable an already-enabled option.
# Ignore this.
pass
elif state.him.enabled and state.him.negotiating:
# This is a bogus state. It is here for completeness. It will
# never be entered.
raise AssertionError(
"him.enabled and him.negotiating cannot be True at the same time. "
+ f"state: {state!r}, option: {option!r}"
)
Method on_wont(self, option)
¶
Called when an IAC + WONT + option is received.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes | None |
The received option. |
required |
Source code in mudproto/telnet.py
def on_wont(self, option: bytes | None) -> None:
"""
Called when an IAC + WONT + option is received.
Args:
option: The received option.
""" # NOQA: DOC501
if option is None:
raise AssertionError("Option must not be None in this context.")
state = self.get_option_state(option)
if not state.him.enabled and not state.him.negotiating:
# Peer is unilaterally demanding that an already-disabled option be/remain disabled.
# Ignore this, although we could record it and refuse subsequent enable attempts
# from our side, peer could refuse them again, so we won't.
pass
elif not state.him.enabled and state.him.negotiating:
# Peer refused to enable an option in response to our request.
state.him.negotiating = False
logger.debug(
f"Peer refuses to enable option {DESCRIPTIONS.get(option, repr(option))} "
+ "in response to our request."
)
elif state.him.enabled and not state.him.negotiating:
# Peer is unilaterally demanding that an option be disabled.
state.him.enabled = False
self.on_disable_remote(option)
self._dont(option)
elif state.him.enabled and state.him.negotiating:
# Peer agreed to disable an option at our request.
state.him.enabled = False
state.him.negotiating = False
self.on_disable_remote(option)
Method request_negotiation(self, option, data)
¶
Performs a Telnet sub-negotiation for the given option.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The option we are negotiating. |
required |
data |
bytes |
The data we are sending in the body of the negotiation. |
required |
Source code in mudproto/telnet.py
def request_negotiation(self, option: bytes, data: bytes) -> None:
"""
Performs a Telnet sub-negotiation for the given option.
Args:
option: The option we are negotiating.
data: The data we are sending in the body of the negotiation.
"""
self.write(IAC + SB + option + escape_iac(data) + IAC + SE)
Method will(self, option)
¶
Tells peer we would like to enable a Telnet option.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The option we wish to enable. |
required |
Source code in mudproto/telnet.py
def will(self, option: bytes) -> None:
"""
Tells peer we would like to enable a Telnet option.
Args:
option: The option we wish to enable.
"""
state = self.get_option_state(option)
if state.us.negotiating or state.him.negotiating:
logger.warning(
f"We are offering to enable option {option!r}, but the option is "
+ f"already being negotiated by {'us' if state.us.negotiating else 'peer'}."
)
elif state.us.enabled:
logger.warning(f"Attempting to enable an already enabled option {option!r}.")
else:
state.us.negotiating = True
self._will(option)
Method wont(self, option)
¶
Tells peer we no longer wish to enable a Telnet option.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
option |
bytes |
The option we wish to disable. |
required |
Source code in mudproto/telnet.py
def wont(self, option: bytes) -> None:
"""
Tells peer we no longer wish to enable a Telnet option.
Args:
option: The option we wish to disable.
"""
state = self.get_option_state(option)
if state.us.negotiating or state.him.negotiating:
logger.warning(
f"We are refusing to enable option {option!r}, but the option is "
+ f"already being negotiated by {'us' if state.us.negotiating else 'peer'}."
)
elif not state.us.enabled:
logger.warning(f"Attempting to disable an already disabled option {option!r}.")
else:
state.us.negotiating = True
self._wont(option)
Method write(self, data)
inherited
¶
Writes data to peer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data |
bytes |
The bytes to be written. |
required |
Source code in mudproto/telnet.py
def write(self, data: bytes) -> None:
"""
Writes data to peer.
Args:
data: The bytes to be written.
"""
self._writer(data)
Class TelnetState(Enum)
¶
Valid states for the state machine.
Source code in mudproto/telnet.py
class TelnetState(Enum):
"""Valid states for the state machine."""
DATA = auto()
COMMAND = auto()
NEWLINE = auto()
NEGOTIATION = auto()
SUBNEGOTIATION = auto()
SUBNEGOTIATION_ESCAPED = auto()
Function escape_iac(data)
¶
Escapes IAC bytes of a bytes-like object.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data |
bytes |
The data to be escaped. |
required |
Returns:
| Type | Description |
|---|---|
bytes |
The data with IAC bytes escaped. |
Source code in mudproto/telnet.py
def escape_iac(data: bytes) -> bytes:
"""
Escapes IAC bytes of a bytes-like object.
Args:
data: The data to be escaped.
Returns:
The data with IAC bytes escaped.
"""
return data.replace(IAC, IAC_IAC)