diff --git a/CHANGELOG.md b/CHANGELOG.md index 4db1eae..363f10f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) ### Added ("feat") +* winrm.py: execute ps in `run_ps()` directly without Invoke-Expression wrapping * disk.py: add `get_owner()` * nextcloud.py * txt.py: add `exception2text()` diff --git a/winrm.py b/winrm.py old mode 100644 new mode 100755 index a6e3b96..84bef02 --- a/winrm.py +++ b/winrm.py @@ -12,10 +12,7 @@ """ __author__ = 'Linuxfabrik GmbH, Zurich/Switzerland' -__version__ = '2025111402' - - -import shlex +__version__ = '2026022501' try: import winrm @@ -31,7 +28,6 @@ from . import txt - def run_cmd(args, cmd, params=None): """ Run a native command on a remote Windows host via WinRM/PSRP and return a @@ -133,9 +129,7 @@ def run_cmd(args, cmd, params=None): cert_validation=True, ) - full_command = shlex.join([cmd] + params) - stdout, stderr, rc = session.execute_cmd(full_command) - + stdout, stderr, rc = session.execute_cmd(cmd, args=params) return { 'retc': rc, 'stdout': txt.to_text(stdout), @@ -177,9 +171,9 @@ def run_cmd(args, cmd, params=None): } -def run_ps(args, cmd): +def run_ps(args, cmd, params=None): """ - Run a PowerShell script/string on a remote Windows host via WinRM/PSRP and + Run a PowerShell cmdlet on a remote Windows host via WinRM/PSRP and return a normalized result dictionary. Prefers **pypsrp (PSRP)** if available (best for JEA/PowerShell Remoting); @@ -202,7 +196,12 @@ def run_ps(args, cmd): name (JEA endpoint). Defaults to `'Microsoft.PowerShell'` if unset. Only supported with **pypsrp**. (Additional attributes may be honored by the underlying libraries if present.) - - **cmd** (`str`): PowerShell scriptblock/string to execute remotely. + - **cmd** (`str`): PowerShell cmdlet name to execute remotely (e.g. `'Get-Service'`). + When using pypsrp, this is passed directly as a cmdlet to the pipeline so JEA + can properly allow/deny it. When falling back to pywinrm, it is passed as a + scriptblock string. + - **params** (`list[str]`, optional): Positional arguments passed to the cmdlet. + Only used with pypsrp. Defaults to `[]`. ### Returns - **dict**: A normalized result with: @@ -217,7 +216,8 @@ def run_ps(args, cmd): ### Behavior - Maps `WINRM_TRANSPORT` to PSRP auth (`kerberos`, `negotiate`, `credssp`, `basic`) and decides SSL/port (5986 for SSL, 5985 otherwise) when using **pypsrp**, - then executes via `Client.execute_ps()`. + then executes via a direct PSRP pipeline (RunspacePool + PowerShell) to avoid + Invoke-Expression wrapping, ensuring JEA can enforce allow/deny on the cmdlet name. - Falls back to **pywinrm** and executes via `Session.run_ps()` if pypsrp is not available. - For Kerberos authentication: if `WINRM_USERNAME` and `WINRM_PASSWORD` are not provided (or are empty/None), the function will attempt to use existing Kerberos credentials @@ -235,7 +235,7 @@ def run_ps(args, cmd): {'retc': 0, 'stdout': 'Name Id\\r\\n---- --\\r\\n...\\r\\n', 'stderr': ''} >>> # With custom configuration name (JEA endpoint): >>> args.WINRM_CONFIGURATION_NAME = 'MyJEAEndpoint' - >>> run_ps(args, "Get-Service", []) + >>> run_ps(args, "Get-Service", ["servicename"]) {'retc': 0, 'stdout': '...','stderr': ''} """ # Determine authentication credentials @@ -256,8 +256,22 @@ def run_ps(args, cmd): if getattr(args, 'WINRM_DOMAIN', None): auth = (f'{username}@{args.WINRM_DOMAIN}', password) + if params is None: + params = [] + + configuration_name = getattr(args, 'WINRM_CONFIGURATION_NAME', None) + if configuration_name and not HAVE_JEA: + return { + 'retc': 1, + 'stdout': '', + 'stderr': 'WINRM_CONFIGURATION_NAME requires pypsrp (JEA). Install pypsrp or unset --winrm-configuration-name.', + } + if HAVE_JEA: try: + from pypsrp.powershell import PowerShell, RunspacePool + from pypsrp.wsman import WSMan + # translate pywinrm transport -> pypsrp auth/ssl/port _auth_map = { 'kerberos': 'kerberos', @@ -272,8 +286,7 @@ def run_ps(args, cmd): _use_ssl = (_transport == 'ssl') _port = 5986 if _use_ssl else 5985 - # create PSRP client (like in winrm.Session) - session = Client( + wsman = WSMan( server=args.WINRM_HOSTNAME, username=auth[0], password=auth[1], @@ -283,16 +296,25 @@ def run_ps(args, cmd): cert_validation=True, ) - # run PowerShell - _configuration_name = getattr(args, 'WINRM_CONFIGURATION_NAME', None) - if _configuration_name: - stdout, streams, had_errors = session.execute_ps(cmd, configuration_name=_configuration_name) - else: - stdout, streams, had_errors = session.execute_ps(cmd) + if not params: + parts = cmd.split() + cmd = parts[0] + params = parts[1:] - # stdout is already a string; stderr from PSRP error stream(s) + # Use RunspacePool + PowerShell directly to avoid Invoke-Expression wrapping, so JEA can properly allow/deny + # the cmdlet by name rather than seeing a raw string blob. + with RunspacePool(wsman, configuration_name=configuration_name or 'Microsoft.PowerShell') as pool: + ps = PowerShell(pool) + ps.add_cmdlet(cmd) + for param in params: + ps.add_argument(param) + output = ps.invoke() + + stdout = '\n'.join([str(o) for o in output]) + + # stderr from PSRP error stream(s) stderr_lines = [] - for err in getattr(streams, 'error', []): + for err in ps.streams.error: # err.to_string() gives a readable message with category/position if available try: stderr_lines.append(err.to_string()) @@ -303,7 +325,7 @@ def run_ps(args, cmd): stderr = '\n'.join(stderr_lines) result = { - 'retc': 0 if not had_errors else 1, + 'retc': 0 if not ps.had_errors else 1, 'stdout': txt.to_text(stdout), 'stderr': txt.to_text(stderr), } @@ -348,4 +370,4 @@ def run_ps(args, cmd): 'retc': 1, 'stdout': '', 'stderr': 'No compatible remoting library available (pypsrp or pywinrm).', - } + } \ No newline at end of file