""" Parser for the Telnet protocol. (Not a complete implementation of the telnet specification, but sufficient for a command line interface.) Inspired by `Twisted.conch.telnet`. """ import struct from typing import Callable, Generator from .log import logger __all__ = [ "TelnetProtocolParser", ] def int2byte(number: int) -> bytes: return bytes((number,)) # Telnet constants. NOP = int2byte(0) SGA = int2byte(3) IAC = int2byte(255) DO = int2byte(253) DONT = int2byte(254) LINEMODE = int2byte(34) SB = int2byte(250) WILL = int2byte(251) WONT = int2byte(252) MODE = int2byte(1) SE = int2byte(240) ECHO = int2byte(1) NAWS = int2byte(31) LINEMODE = int2byte(34) SUPPRESS_GO_AHEAD = int2byte(3) TTYPE = int2byte(24) SEND = int2byte(1) IS = int2byte(0) DM = int2byte(242) BRK = int2byte(243) IP = int2byte(244) AO = int2byte(245) AYT = int2byte(246) EC = int2byte(247) EL = int2byte(248) GA = int2byte(249) class TelnetProtocolParser: """ Parser for the Telnet protocol. Usage:: def data_received(data): print(data) def size_received(rows, columns): print(rows, columns) p = TelnetProtocolParser(data_received, size_received) p.feed(binary_data) """ def __init__( self, data_received_callback: Callable[[bytes], None], size_received_callback: Callable[[int, int], None], ttype_received_callback: Callable[[str], None], ) -> None: self.data_received_callback = data_received_callback self.size_received_callback = size_received_callback self.ttype_received_callback = ttype_received_callback self._parser = self._parse_coroutine() self._parser.send(None) # type: ignore def received_data(self, data: bytes) -> None: self.data_received_callback(data) def do_received(self, data: bytes) -> None: """Received telnet DO command.""" logger.info("DO %r", data) def dont_received(self, data: bytes) -> None: """Received telnet DONT command.""" logger.info("DONT %r", data) def will_received(self, data: bytes) -> None: """Received telnet WILL command.""" logger.info("WILL %r", data) def wont_received(self, data: bytes) -> None: """Received telnet WONT command.""" logger.info("WONT %r", data) def command_received(self, command: bytes, data: bytes) -> None: if command == DO: self.do_received(data) elif command == DONT: self.dont_received(data) elif command == WILL: self.will_received(data) elif command == WONT: self.wont_received(data) else: logger.info("command received %r %r", command, data) def naws(self, data: bytes) -> None: """ Received NAWS. (Window dimensions.) """ if len(data) == 4: # NOTE: the first parameter of struct.unpack should be # a 'str' object. Both on Py2/py3. This crashes on OSX # otherwise. columns, rows = struct.unpack(str("!HH"), data) self.size_received_callback(rows, columns) else: logger.warning("Wrong number of NAWS bytes") def ttype(self, data: bytes) -> None: """ Received terminal type. """ subcmd, data = data[0:1], data[1:] if subcmd == IS: ttype = data.decode("ascii") self.ttype_received_callback(ttype) else: logger.warning("Received a non-IS terminal type Subnegotiation") def negotiate(self, data: bytes) -> None: """ Got negotiate data. """ command, payload = data[0:1], data[1:] if command == NAWS: self.naws(payload) elif command == TTYPE: self.ttype(payload) else: logger.info("Negotiate (%r got bytes)", len(data)) def _parse_coroutine(self) -> Generator[None, bytes, None]: """ Parser state machine. Every 'yield' expression returns the next byte. """ while True: d = yield if d == int2byte(0): pass # NOP # Go to state escaped. elif d == IAC: d2 = yield if d2 == IAC: self.received_data(d2) # Handle simple commands. elif d2 in (NOP, DM, BRK, IP, AO, AYT, EC, EL, GA): self.command_received(d2, b"") # Handle IAC-[DO/DONT/WILL/WONT] commands. elif d2 in (DO, DONT, WILL, WONT): d3 = yield self.command_received(d2, d3) # Subnegotiation elif d2 == SB: # Consume everything until next IAC-SE data = [] while True: d3 = yield if d3 == IAC: d4 = yield if d4 == SE: break else: data.append(d4) else: data.append(d3) self.negotiate(b"".join(data)) else: self.received_data(d) def feed(self, data: bytes) -> None: """ Feed data to the parser. """ for b in data: self._parser.send(int2byte(b))