Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/history.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
History
=======
4.0.1
* bugfix: ``telnetlib3-client`` could begin a shell in wrong ECHO mode, depending on order of
options in a "connection burst".

4.0.0
* removed: ``telnetlib3.color_filter``. ``ColorFilter``, ``ColorConfig``, ``PALETTES``,
``PetsciiColorFilter``, and ``AtasciiControlFilter`` have all been moved to the downstream
Expand Down
4 changes: 2 additions & 2 deletions telnetlib3/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ def connection_factory() -> client_base.BaseClient:

try:
_, protocol = await asyncio.wait_for(
asyncio.get_event_loop().create_connection(
asyncio.get_running_loop().create_connection(
connection_factory, host or "localhost", port, **conn_kwargs
),
timeout=connect_timeout,
Expand Down Expand Up @@ -1267,7 +1267,7 @@ def patched_send_env(keys: Sequence[str]) -> Dict[str, Any]:
else:
fp_ssl = ssl_module.create_default_context()

waiter_closed: asyncio.Future[None] = asyncio.get_event_loop().create_future()
waiter_closed: asyncio.Future[None] = asyncio.get_running_loop().create_future()

fp_conn_kwargs: Dict[str, Any] = {
"host": args.host,
Expand Down
141 changes: 111 additions & 30 deletions telnetlib3/client_shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,15 @@ def feed(self, data: bytes) -> bytes:

@dataclass
class _RawLoopState:
"""Mutable state bundle for :func:`_raw_event_loop`."""
"""
Mutable state bundle for :func:`_raw_event_loop`.

Initialised by :func:`telnet_client_shell` before the loop starts and mutated
in-place as mid-session negotiation arrives (e.g. server WILL ECHO toggling
after login, LINEMODE EDIT confirmed by server). On loop exit,
``switched_to_raw`` and ``reactivate_repl`` reflect final state so the caller
can decide whether to restart a REPL.
"""

switched_to_raw: bool
last_will_echo: bool
Expand Down Expand Up @@ -529,7 +537,7 @@ async def make_stdout(self) -> asyncio.StreamWriter:
write_fobj = sys.stdout
if self._istty:
write_fobj = sys.stdin
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
writer_transport, writer_protocol = await loop.connect_write_pipe(
asyncio.streams.FlowControlMixin, write_fobj
)
Expand All @@ -544,7 +552,7 @@ async def connect_stdin(self) -> asyncio.StreamReader:
"""
reader = asyncio.StreamReader()
reader_protocol = asyncio.StreamReaderProtocol(reader)
transport, _ = await asyncio.get_event_loop().connect_read_pipe(
transport, _ = await asyncio.get_running_loop().connect_read_pipe(
lambda: reader_protocol, sys.stdin
)
self._stdin_transport = transport
Expand Down Expand Up @@ -628,17 +636,37 @@ def _send_stdin(
return new_timer, pending

def _get_raw_mode(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> "bool | None":
"""Return the writer's ``ctx.raw_mode`` (``None``, ``True``, or ``False``)."""
"""
Return the raw-mode override from the writer's session context.

``None`` = auto-detect from server negotiation (default),
``True`` = force raw / character-at-a-time,
``False`` = force line mode.
"""
return writer.ctx.raw_mode

def _ensure_autoreply_engine(
telnet_writer: Union[TelnetWriter, TelnetWriterUnicode],
) -> "Optional[Any]":
"""Return the autoreply engine from the writer's context, if set."""
"""
Return the autoreply engine from the writer's session context, or ``None``.

The autoreply engine is optional application-level machinery (e.g. a macro
engine in a MUD client) that watches server output and sends pre-configured
replies. It is absent in standalone telnetlib3 and supplied by the host
application via ``writer.ctx.autoreply_engine``.
"""
return telnet_writer.ctx.autoreply_engine

def _get_linemode_buffer(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> "LinemodeBuffer":
"""Return (or lazily create) the LinemodeBuffer attached to *writer*."""
"""
Return (or lazily create) the :class:`LinemodeBuffer` attached to *writer*.

The buffer is stored as ``writer._linemode_buf`` so it persists across loop
iterations and accumulates characters between :meth:`LinemodeBuffer.feed`
calls. Created on first use because LINEMODE negotiation may complete after
the shell has already started.
"""
buf: Optional[LinemodeBuffer] = getattr(writer, "_linemode_buf", None)
if buf is None:
buf = LinemodeBuffer(
Expand Down Expand Up @@ -749,7 +777,7 @@ async def _raw_event_loop(
ar_engine = _ensure_autoreply_engine(telnet_writer)
if ar_engine is not None:
ar_engine.feed(out)
if raw_mode is None:
if raw_mode is None or (raw_mode is True and state.switched_to_raw):
mode_result = tty_shell.check_auto_mode(
state.switched_to_raw, state.last_will_echo
)
Expand All @@ -763,7 +791,7 @@ async def _raw_event_loop(
# becomes \r\n for correct display.
if state.switched_to_raw and not in_raw:
out = out.replace("\n", "\r\n")
if want_repl():
if raw_mode is None and want_repl():
state.reactivate_repl = True
stdout.write(out.encode())
_ts_file = telnet_writer.ctx.typescript_file
Expand Down Expand Up @@ -807,49 +835,102 @@ async def telnet_client_shell(
stdout = await tty_shell.make_stdout()
tty_shell.setup_winch()

# EOR/GA-based command pacing for raw-mode autoreplies.
prompt_ready_raw = asyncio.Event()
prompt_ready_raw.set()
ga_detected_raw = False

_sh_ctx: TelnetSessionContext = telnet_writer.ctx

def _on_prompt_signal_raw(_cmd: bytes) -> None:
nonlocal ga_detected_raw
ga_detected_raw = True
prompt_ready_raw.set()
ar = _sh_ctx.autoreply_engine
# Prompt-pacing via IAC GA / IAC EOR.
#
# MUD servers emit IAC GA (Go-Ahead, RFC 854) or IAC EOR (End-of-Record, RFC 885) after
# each prompt to signal "output is complete, awaiting your input." The autoreply engine
# uses this to pace its replies. It calls ctx.autoreply_wait_fn() before sending each
# reply, preventing races where a reply arrives before the server has finished rendering
# the prompt.
#
# 'server_uses_ga' becomes True on the first GA/EOR received. _wait_for_prompt is does
# nothing until 'server_uses_ga', so servers that never send GA/EOR (Most everything but
# MUDs these days) are silently unaffected.
#
# prompt_event starts SET so the first autoreply fires immediately — there is no prior
# GA to wait for. _on_ga_or_eor re-sets it on each prompt signal; _wait_for_prompt
# clears it after consuming the signal so the next autoreply waits for the following
# prompt.
prompt_event = asyncio.Event()
prompt_event.set()
server_uses_ga = False

# The session context is the decoupling point between this shell and the
# autoreply engine (which may live in a separate module). Storing
# _wait_for_prompt on it lets the engine call back into our local event state
# without a direct import or reference to this closure.
ctx: TelnetSessionContext = telnet_writer.ctx

def _on_ga_or_eor(_cmd: bytes) -> None:
nonlocal server_uses_ga
server_uses_ga = True
prompt_event.set()
ar = ctx.autoreply_engine
if ar is not None:
ar.on_prompt()

from .telopt import GA, CMD_EOR

telnet_writer.set_iac_callback(GA, _on_prompt_signal_raw)
telnet_writer.set_iac_callback(CMD_EOR, _on_prompt_signal_raw)
telnet_writer.set_iac_callback(GA, _on_ga_or_eor)
telnet_writer.set_iac_callback(CMD_EOR, _on_ga_or_eor)

async def _wait_for_prompt() -> None:
"""
Wait for the next prompt signal before the autoreply engine sends a reply.

async def _wait_for_prompt_raw() -> None:
if not ga_detected_raw:
No-op until the first GA/EOR confirms this server uses prompt signalling.
After that, blocks until :func:`_on_ga_or_eor` fires the event, then clears
it to arm the wait for the following prompt. A 2-second safety timeout
prevents stalling if the server stops sending GA mid-session.
"""
if not server_uses_ga:
return
try:
await asyncio.wait_for(prompt_ready_raw.wait(), timeout=2.0)
await asyncio.wait_for(prompt_event.wait(), timeout=2.0)
except asyncio.TimeoutError:
pass
prompt_ready_raw.clear()
prompt_event.clear()

_sh_ctx.autoreply_wait_fn = _wait_for_prompt_raw
ctx.autoreply_wait_fn = _wait_for_prompt

escape_name = accessories.name_unicode(keyboard_escape)
banner_sep = "\r\n" if tty_shell._istty else linesep
stdout.write(f"Escape character is '{escape_name}'.{banner_sep}".encode())

def _handle_close(msg: str) -> None:
# \033[m resets all SGR attributes so server-set colours do not
# bleed into the terminal after disconnect.
stdout.write(f"\033[m{linesep}{msg}{linesep}".encode())
tty_shell.cleanup_winch()

def _want_repl() -> bool:
def _should_reactivate_repl() -> bool:
# Extension point for callers that embed a REPL (e.g. a MUD client).
# Return True to break _raw_event_loop and return to the REPL when
# the server puts the terminal back into local mode. The base shell
# has no REPL, so this always returns False.
return False

# Standard event loop (byte-at-a-time).
# Wait up to 50 ms for subsequent WILL ECHO / WILL SGA packets to arrive before
# committing to a terminal mode.
#
# check_negotiation() declares the handshake complete as soon as TTYPE and NEW_ENVIRON /
# CHARSET are settled, without waiting for ECHO / SGA. Those options typically travel
# in the same "initial negotiation burst" but may not have not yet have "arrived" at
# this point in our TCP read until a few milliseconds later. Servers that never send
# WILL ECHO (rlogin, basically) simply time out and proceed correctly.
raw_mode = _get_raw_mode(telnet_writer)
if raw_mode is not False and tty_shell._istty:
try:
await asyncio.wait_for(
telnet_writer.wait_for_condition(lambda w: w.mode != "local"), timeout=0.05
)
except (asyncio.TimeoutError, asyncio.CancelledError):
pass

# Commit the terminal to raw mode now that will_echo is stable. suppress_echo=True
# disables the kernel's local ECHO because the server will echo (or we handle it in
# software). local_echo is set to True only when the server will NOT echo, so we
# reproduce keystrokes ourselves.
if not switched_to_raw and tty_shell._istty and tty_shell._save_mode is not None:
tty_shell.set_mode(tty_shell._make_raw(tty_shell._save_mode, suppress_echo=True))
switched_to_raw = True
Expand All @@ -871,6 +952,6 @@ def _want_repl() -> bool:
keyboard_escape,
state,
_handle_close,
_want_repl,
_should_reactivate_repl,
)
tty_shell.disconnect_stdin(stdin)
5 changes: 3 additions & 2 deletions telnetlib3/fingerprinting.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,9 @@ async def probe_client_capabilities(

await writer.drain()

deadline = asyncio.get_event_loop().time() + timeout
while asyncio.get_event_loop().time() < deadline:
loop = asyncio.get_running_loop()
deadline = loop.time() + timeout
while loop.time() < deadline:
all_responded = all(
writer.remote_option.get(opt) is not None
for opt, name, desc in to_probe
Expand Down
4 changes: 2 additions & 2 deletions telnetlib3/fingerprinting_display.py
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ def _build_seen_counts(
return ""


def _color_match(term: "blessed.Terminal", name: str, score: float) -> str:
def _color_match(term: Optional["blessed.Terminal"], name: str, score: float) -> str:
"""
Color a nearest-match result by confidence threshold.

Expand All @@ -790,7 +790,7 @@ def _color_match(term: "blessed.Terminal", name: str, score: float) -> str:
def _nearest_match_lines(
data: Dict[str, Any],
names: Dict[str, str],
term: "blessed.Terminal",
term: Optional["blessed.Terminal"],
telnet_unknown: bool = False,
terminal_unknown: bool = False,
) -> List[str]:
Expand Down
8 changes: 4 additions & 4 deletions telnetlib3/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,7 @@ async def _upgrade_to_tls(self) -> None:
(rewritten in 3.11). See
https://github.com/python/cpython/issues/79156
"""
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
assert self._transport is not None
protocol = self._real_factory()
try:
Expand Down Expand Up @@ -1138,13 +1138,13 @@ def _make_telnet_protocol() -> asyncio.Protocol:
def factory() -> asyncio.Protocol:
return _TLSAutoDetectProtocol(ssl, _make_telnet_protocol)

telnet_server._server = await asyncio.get_event_loop().create_server(factory, host, port)
telnet_server._server = await asyncio.get_running_loop().create_server(factory, host, port)
else:

def factory() -> asyncio.Protocol:
return _make_telnet_protocol()

telnet_server._server = await asyncio.get_event_loop().create_server(
telnet_server._server = await asyncio.get_running_loop().create_server(
factory, host, port, ssl=ssl
)

Expand Down Expand Up @@ -1392,7 +1392,7 @@ async def guarded_shell(
_cfg_mapping = ", ".join((f"{field}={{{field}}}" for field in CONFIG._fields)).format(**_locals)
logger.debug("Server configuration: %s", _cfg_mapping)

loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()

# bind
server = await create_server(
Expand Down
2 changes: 1 addition & 1 deletion telnetlib3/server_fingerprinting.py
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,7 @@ async def _read_banner_until_quiet(
stripped_accum = bytearray()
esc_responded = False
menu_responded = False
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
deadline = loop.time() + max_wait
while loop.time() < deadline:
remaining = min(quiet_time, deadline - loop.time())
Expand Down
4 changes: 2 additions & 2 deletions telnetlib3/server_pty_shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ async def run(self) -> None:
"""Bridge loop between telnet and PTY."""
import errno

loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
pty_read_event = asyncio.Event()
pty_data_queue: asyncio.Queue[bytes] = asyncio.Queue()

Expand Down Expand Up @@ -583,7 +583,7 @@ async def _wait_for_terminal_info(
:param writer: TelnetWriter instance.
:param timeout: Maximum time to wait in seconds.
"""
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
start = loop.time()

while loop.time() - start < timeout:
Expand Down
2 changes: 1 addition & 1 deletion telnetlib3/tests/accessories.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ async def asyncio_server(protocol_factory, host, port):
class TrackingProtocol(_TrackingProtocol, protocol_factory):
_transports = transports

server = await asyncio.get_event_loop().create_server(TrackingProtocol, host, port)
server = await asyncio.get_running_loop().create_server(TrackingProtocol, host, port)
try:
yield server
finally:
Expand Down
2 changes: 1 addition & 1 deletion telnetlib3/tests/test_client_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ async def test_begin_shell_cancelled_future():
client = BaseClient.__new__(BaseClient)
client.log = types.SimpleNamespace(debug=lambda *a, **kw: None, isEnabledFor=lambda _: False)
client.shell = lambda r, w: None
fut = asyncio.get_event_loop().create_future()
fut = asyncio.get_running_loop().create_future()
fut.cancel()
client.begin_shell(fut)

Expand Down
7 changes: 4 additions & 3 deletions telnetlib3/tests/test_pty_shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,10 @@ def begin_shell(self, result):
await writer.drain()

result = ""
deadline = asyncio.get_event_loop().time() + 2.0
loop = asyncio.get_running_loop()
deadline = loop.time() + 2.0
while "hello world" not in result:
remaining = deadline - asyncio.get_event_loop().time()
remaining = deadline - loop.time()
if remaining <= 0:
break
chunk = await asyncio.wait_for(reader.read(50), remaining)
Expand Down Expand Up @@ -892,7 +893,7 @@ async def noop_bridge(*a):

with (
patch("os.waitpid", return_value=(0, 0)),
patch("asyncio.get_event_loop", return_value=mock_loop),
patch("asyncio.get_running_loop", return_value=mock_loop),
patch.object(session, "_bridge_loop", side_effect=noop_bridge),
):
await session.run()
Expand Down
Loading
Loading