Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)

### Added ("feat")

* winrm.py: execute ps in `run_ps()` directly without Invoke-Expression wrapping
* disk.py: add `get_owner()`
* nextcloud.py
* txt.py: add `exception2text()`
Expand Down
72 changes: 47 additions & 25 deletions winrm.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@
"""

__author__ = 'Linuxfabrik GmbH, Zurich/Switzerland'
__version__ = '2025111402'


import shlex
__version__ = '2026022501'

try:
import winrm
Expand All @@ -31,7 +28,6 @@

from . import txt


def run_cmd(args, cmd, params=None):
"""
Run a native command on a remote Windows host via WinRM/PSRP and return a
Expand Down Expand Up @@ -133,9 +129,7 @@ def run_cmd(args, cmd, params=None):
cert_validation=True,
)

full_command = shlex.join([cmd] + params)
stdout, stderr, rc = session.execute_cmd(full_command)

stdout, stderr, rc = session.execute_cmd(cmd, args=params)
return {
'retc': rc,
'stdout': txt.to_text(stdout),
Expand Down Expand Up @@ -177,9 +171,9 @@ def run_cmd(args, cmd, params=None):
}


def run_ps(args, cmd):
def run_ps(args, cmd, params=None):
"""
Run a PowerShell script/string on a remote Windows host via WinRM/PSRP and
Run a PowerShell cmdlet on a remote Windows host via WinRM/PSRP and
return a normalized result dictionary.

Prefers **pypsrp (PSRP)** if available (best for JEA/PowerShell Remoting);
Expand All @@ -202,7 +196,12 @@ def run_ps(args, cmd):
name (JEA endpoint). Defaults to `'Microsoft.PowerShell'` if unset.
Only supported with **pypsrp**.
(Additional attributes may be honored by the underlying libraries if present.)
- **cmd** (`str`): PowerShell scriptblock/string to execute remotely.
- **cmd** (`str`): PowerShell cmdlet name to execute remotely (e.g. `'Get-Service'`).
When using pypsrp, this is passed directly as a cmdlet to the pipeline so JEA
can properly allow/deny it. When falling back to pywinrm, it is passed as a
scriptblock string.
- **params** (`list[str]`, optional): Positional arguments passed to the cmdlet.
Only used with pypsrp. Defaults to `[]`.

### Returns
- **dict**: A normalized result with:
Expand All @@ -217,7 +216,8 @@ def run_ps(args, cmd):
### Behavior
- Maps `WINRM_TRANSPORT` to PSRP auth (`kerberos`, `negotiate`, `credssp`, `basic`)
and decides SSL/port (5986 for SSL, 5985 otherwise) when using **pypsrp**,
then executes via `Client.execute_ps()`.
then executes via a direct PSRP pipeline (RunspacePool + PowerShell) to avoid
Invoke-Expression wrapping, ensuring JEA can enforce allow/deny on the cmdlet name.
- Falls back to **pywinrm** and executes via `Session.run_ps()` if pypsrp is not available.
- For Kerberos authentication: if `WINRM_USERNAME` and `WINRM_PASSWORD` are not provided
(or are empty/None), the function will attempt to use existing Kerberos credentials
Expand All @@ -235,7 +235,7 @@ def run_ps(args, cmd):
{'retc': 0, 'stdout': 'Name Id\\r\\n---- --\\r\\n...\\r\\n', 'stderr': ''}
>>> # With custom configuration name (JEA endpoint):
>>> args.WINRM_CONFIGURATION_NAME = 'MyJEAEndpoint'
>>> run_ps(args, "Get-Service", [])
>>> run_ps(args, "Get-Service", ["servicename"])
{'retc': 0, 'stdout': '...','stderr': ''}
"""
# Determine authentication credentials
Expand All @@ -256,8 +256,22 @@ def run_ps(args, cmd):
if getattr(args, 'WINRM_DOMAIN', None):
auth = (f'{username}@{args.WINRM_DOMAIN}', password)

if params is None:
params = []

configuration_name = getattr(args, 'WINRM_CONFIGURATION_NAME', None)
if configuration_name and not HAVE_JEA:
return {
'retc': 1,
'stdout': '',
'stderr': 'WINRM_CONFIGURATION_NAME requires pypsrp (JEA). Install pypsrp or unset --winrm-configuration-name.',
}

if HAVE_JEA:
try:
from pypsrp.powershell import PowerShell, RunspacePool
from pypsrp.wsman import WSMan

# translate pywinrm transport -> pypsrp auth/ssl/port
_auth_map = {
'kerberos': 'kerberos',
Expand All @@ -272,8 +286,7 @@ def run_ps(args, cmd):
_use_ssl = (_transport == 'ssl')
_port = 5986 if _use_ssl else 5985

# create PSRP client (like in winrm.Session)
session = Client(
wsman = WSMan(
server=args.WINRM_HOSTNAME,
username=auth[0],
password=auth[1],
Expand All @@ -283,16 +296,25 @@ def run_ps(args, cmd):
cert_validation=True,
)

# run PowerShell
_configuration_name = getattr(args, 'WINRM_CONFIGURATION_NAME', None)
if _configuration_name:
stdout, streams, had_errors = session.execute_ps(cmd, configuration_name=_configuration_name)
else:
stdout, streams, had_errors = session.execute_ps(cmd)
if not params:
parts = cmd.split()
cmd = parts[0]
params = parts[1:]

# stdout is already a string; stderr from PSRP error stream(s)
# Use RunspacePool + PowerShell directly to avoid Invoke-Expression wrapping, so JEA can properly allow/deny
# the cmdlet by name rather than seeing a raw string blob.
with RunspacePool(wsman, configuration_name=configuration_name or 'Microsoft.PowerShell') as pool:
ps = PowerShell(pool)
ps.add_cmdlet(cmd)
for param in params:
ps.add_argument(param)
output = ps.invoke()

stdout = '\n'.join([str(o) for o in output])

# stderr from PSRP error stream(s)
stderr_lines = []
for err in getattr(streams, 'error', []):
for err in ps.streams.error:
# err.to_string() gives a readable message with category/position if available
try:
stderr_lines.append(err.to_string())
Expand All @@ -303,7 +325,7 @@ def run_ps(args, cmd):
stderr = '\n'.join(stderr_lines)

result = {
'retc': 0 if not had_errors else 1,
'retc': 0 if not ps.had_errors else 1,
'stdout': txt.to_text(stdout),
'stderr': txt.to_text(stderr),
}
Expand Down Expand Up @@ -348,4 +370,4 @@ def run_ps(args, cmd):
'retc': 1,
'stdout': '',
'stderr': 'No compatible remoting library available (pypsrp or pywinrm).',
}
}