Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 125 additions & 6 deletions mssql_python/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2527,67 +2527,132 @@ def _bulkcopy(
ValueError: If table_name is empty or parameters are invalid
RuntimeError: If connection string is not available
"""
# Fast check if logging is enabled to avoid overhead
is_logging = logger.is_debug_enabled

if is_logging:
logger.debug(
"_bulkcopy: Starting bulk copy operation - table=%s, batch_size=%d, timeout=%d, "
"keep_identity=%s, check_constraints=%s, table_lock=%s, keep_nulls=%s, "
"fire_triggers=%s, use_internal_transaction=%s",
table_name,
batch_size,
timeout,
keep_identity,
check_constraints,
table_lock,
keep_nulls,
fire_triggers,
use_internal_transaction,
)

try:
import mssql_py_core

if is_logging:
logger.debug("_bulkcopy: mssql_py_core module imported successfully")
except ImportError as exc:
if is_logging:
logger.error("_bulkcopy: Failed to import mssql_py_core module")
raise ImportError(
"Bulk copy requires the mssql_py_core library which is not installed. "
"To install, run: pip install mssql_py_core "
) from exc

# Validate inputs
if is_logging:
logger.debug("_bulkcopy: Validating input parameters")
if not table_name or not isinstance(table_name, str):
if is_logging:
logger.error("_bulkcopy: Invalid table_name parameter")
raise ValueError("table_name must be a non-empty string")

# Validate that data is iterable (but not a string or bytes, which are technically iterable)
if data is None:
if is_logging:
logger.error("_bulkcopy: data parameter is None")
raise TypeError("data must be an iterable of tuples or lists, got None")
if isinstance(data, (str, bytes)):
if is_logging:
logger.error(
"_bulkcopy: data parameter is string or bytes, not valid row collection"
)
raise TypeError(
f"data must be an iterable of tuples or lists, got {type(data).__name__}. "
"Strings and bytes are not valid row collections."
)
if not hasattr(data, "__iter__"):
if is_logging:
logger.error("_bulkcopy: data parameter is not iterable")
raise TypeError(
f"data must be an iterable of tuples or lists, got non-iterable {type(data).__name__}"
)
if is_logging:
logger.debug("_bulkcopy: Data parameter validation successful")

# Validate batch_size type and value (0 means server optimal)
if not isinstance(batch_size, int):
if is_logging:
logger.error("_bulkcopy: Invalid batch_size type: %s", type(batch_size).__name__)
raise TypeError(
f"batch_size must be a non-negative integer, got {type(batch_size).__name__}"
)
if batch_size < 0:
if is_logging:
logger.error("_bulkcopy: Invalid batch_size value: %d", batch_size)
raise ValueError(f"batch_size must be non-negative, got {batch_size}")

# Validate timeout type and value
if not isinstance(timeout, int):
if is_logging:
logger.error("_bulkcopy: Invalid timeout type: %s", type(timeout).__name__)
raise TypeError(f"timeout must be a positive integer, got {type(timeout).__name__}")
if timeout <= 0:
if is_logging:
logger.error("_bulkcopy: Invalid timeout value: %d", timeout)
raise ValueError(f"timeout must be positive, got {timeout}")

# Get and parse connection string
if is_logging:
logger.debug("_bulkcopy: Retrieving connection string")
if not hasattr(self.connection, "connection_str"):
if is_logging:
logger.error("_bulkcopy: Connection string not available")
raise RuntimeError("Connection string not available for bulk copy")

# Use the proper connection string parser that handles braced values
from mssql_python.connection_string_parser import _ConnectionStringParser

if is_logging:
logger.debug("_bulkcopy: Parsing connection string")
parser = _ConnectionStringParser(validate_keywords=False)
params = parser._parse(self.connection.connection_str)

if not params.get("server"):
if is_logging:
logger.error("_bulkcopy: SERVER parameter missing in connection string")
raise ValueError("SERVER parameter is required in connection string")

if not params.get("database"):
if is_logging:
logger.error("_bulkcopy: DATABASE parameter missing in connection string")
raise ValueError(
"DATABASE parameter is required in connection string for bulk copy. "
"Specify the target database explicitly to avoid accidentally writing to system databases."
)

if is_logging:
logger.debug(
"_bulkcopy: Connection parameters - server=%s, database=%s",
params.get("server"),
params.get("database"),
)

# Build connection context for bulk copy library
# Note: Password is extracted separately to avoid storing it in the main context
# dict that could be accidentally logged or exposed in error messages.
if is_logging:
logger.debug("_bulkcopy: Building connection context")
trust_cert = params.get("trustservercertificate", "yes").lower() in ("yes", "true")

# Parse encryption setting from connection string
Expand All @@ -2604,6 +2669,13 @@ def _bulkcopy(
else:
encryption = "Optional"

if is_logging:
logger.debug(
"_bulkcopy: Connection security - encryption=%s, trust_cert=%s",
encryption,
trust_cert,
)

context = {
"server": params.get("server"),
"database": params.get("database"),
Expand Down Expand Up @@ -2638,12 +2710,43 @@ def _bulkcopy(
pycore_context["user_name"] = params.get("uid", "")
pycore_context["password"] = params.get("pwd", "")

# Log column mappings if provided (only when logging is enabled)
if is_logging and column_mappings:
if isinstance(column_mappings, list) and column_mappings:
if isinstance(column_mappings[0], tuple):
logger.debug(
"_bulkcopy: Using advanced column mappings with %d mapping(s)",
len(column_mappings),
)
else:
logger.debug(
"_bulkcopy: Using simple column mappings with %d column(s)",
len(column_mappings),
)
elif is_logging:
logger.debug("_bulkcopy: No column mappings provided, using ordinal position mapping")

pycore_connection = None
pycore_cursor = None
try:
pycore_connection = mssql_py_core.PyCoreConnection(pycore_context)
if is_logging:
logger.debug("_bulkcopy: Creating PyCoreConnection")
# Only pass logger to Rust if logging is enabled (performance optimization)
pycore_connection = mssql_py_core.PyCoreConnection(
pycore_context, python_logger=logger if is_logging else None
)
if is_logging:
logger.debug("_bulkcopy: PyCoreConnection created successfully")

logger.debug("_bulkcopy: Creating PyCoreCursor")
pycore_cursor = pycore_connection.cursor()
if is_logging:
logger.debug("_bulkcopy: PyCoreCursor created successfully")

# Call bulkcopy with explicit keyword arguments
# The API signature: bulkcopy(table_name, data_source, batch_size=0, timeout=30, ...)
if is_logging:
logger.info("_bulkcopy: Executing bulk copy operation to table '%s'", table_name)
result = pycore_cursor.bulkcopy(
table_name,
iter(data),
Expand All @@ -2656,8 +2759,17 @@ def _bulkcopy(
keep_nulls=keep_nulls,
fire_triggers=fire_triggers,
use_internal_transaction=use_internal_transaction,
python_logger=logger if is_logging else None, # Only pass logger if enabled
)

if is_logging:
logger.info(
"_bulkcopy: Bulk copy completed successfully - rows_copied=%s, batch_count=%s, elapsed_time=%s",
result.get("rows_copied", "N/A"),
result.get("batch_count", "N/A"),
result.get("elapsed_time", "N/A"),
)

return result

except Exception as e:
Expand All @@ -2673,6 +2785,8 @@ def _bulkcopy(
raise type(e)(str(e)) from None

finally:
if is_logging:
logger.debug("_bulkcopy: Starting cleanup")
# Clear sensitive data to minimize memory exposure
if pycore_context:
pycore_context.pop("password", None)
Expand All @@ -2682,15 +2796,20 @@ def _bulkcopy(
for resource in (pycore_cursor, pycore_connection):
if resource and hasattr(resource, "close"):
try:
if is_logging:
logger.debug("_bulkcopy: Closing resource %s", type(resource).__name__)
resource.close()
except Exception as cleanup_error:
# Log cleanup errors at debug level to aid troubleshooting
# without masking the original exception
logger.debug(
"Failed to close bulk copy resource %s: %s",
type(resource).__name__,
cleanup_error,
)
if is_logging:
logger.debug(
"Failed to close bulk copy resource %s: %s",
type(resource).__name__,
cleanup_error,
)
if is_logging:
logger.debug("_bulkcopy: Cleanup completed")

def __enter__(self):
"""
Expand Down
80 changes: 68 additions & 12 deletions mssql_python/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ def __init__(self):
self._handler_lock = threading.RLock() # Reentrant lock for handler operations
self._cleanup_registered = False # Track if atexit cleanup is registered

# Cached level for fast checks (avoid repeated isEnabledFor calls)
self._cached_level = logging.CRITICAL
self._is_debug_enabled = False

# Don't setup handlers yet - do it lazily when setLevel is called
# This prevents creating log files when user changes output mode before enabling logging

Expand Down Expand Up @@ -145,15 +149,20 @@ def _setup_handlers(self):
# Custom formatter to extract source from message and format as CSV
class CSVFormatter(logging.Formatter):
def format(self, record):
# Extract source from message (e.g., [Python] or [DDBC])
msg = record.getMessage()
if msg.startswith("[") and "]" in msg:
end_bracket = msg.index("]")
source = msg[1:end_bracket]
message = msg[end_bracket + 2 :].strip() # Skip '] '
# Check if this is from py-core (via py_core_log method)
if hasattr(record, "funcName") and record.funcName == "py-core":
source = "py-core"
message = record.getMessage()
else:
source = "Unknown"
message = msg
# Extract source from message (e.g., [Python] or [DDBC])
msg = record.getMessage()
if msg.startswith("[") and "]" in msg:
end_bracket = msg.index("]")
source = msg[1:end_bracket]
message = msg[end_bracket + 2 :].strip() # Skip '] '
else:
source = "Unknown"
message = msg

# Format timestamp with milliseconds using period separator
timestamp = self.formatTime(record, "%Y-%m-%d %H:%M:%S")
Expand Down Expand Up @@ -326,6 +335,42 @@ def _write_log_header(self):
pass # Even stderr notification failed
# Don't crash - logging continues without header

def py_core_log(self, level: int, msg: str, filename: str = "cursor.rs", lineno: int = 0):
"""
Logging method for py-core (Rust/TDS) code with custom source location.

Args:
level: Log level (DEBUG, INFO, WARNING, ERROR)
msg: Message string (already formatted)
filename: Source filename (e.g., 'cursor.rs')
lineno: Line number in source file
"""
try:
if not self._logger.isEnabledFor(level):
Copy link

Copilot AI Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The py_core_log method uses self._logger.isEnabledFor(level) instead of the cached level check. For consistency with the performance optimization introduced in _log method and to avoid redundant isEnabledFor calls, this should use the cached level check: "if level < self._cached_level: return". This would maintain the same performance benefit that was the goal of introducing the cached level.

Suggested change
if not self._logger.isEnabledFor(level):
if level < self._cached_level:

Copilot uses AI. Check for mistakes.
return

# Create a custom LogRecord with Rust source location
import logging as log_module

record = log_module.LogRecord(
name=self._logger.name,
level=level,
pathname=filename,
lineno=lineno,
msg=msg,
args=(),
exc_info=None,
func="py-core",
sinfo=None,
)
self._logger.handle(record)
except Exception:
# Fallback - use regular logging
try:
self._logger.log(level, msg)
except:
pass

def _log(self, level: int, msg: str, add_prefix: bool = True, *args, **kwargs):
"""
Internal logging method with exception safety.
Expand All @@ -352,8 +397,9 @@ def _log(self, level: int, msg: str, add_prefix: bool = True, *args, **kwargs):
All other failures are silently ignored to prevent app crashes.
"""
try:
# Fast level check (zero overhead if disabled)
if not self._logger.isEnabledFor(level):
# Fast level check using cached level (zero overhead if disabled)
# This avoids the overhead of isEnabledFor() method call
if level < self._cached_level:
return

# Add prefix if requested (only after level check)
Expand All @@ -364,8 +410,9 @@ def _log(self, level: int, msg: str, add_prefix: bool = True, *args, **kwargs):
if args:
msg = msg % args

# Log the message (no args since already formatted)
self._logger.log(level, msg, **kwargs)
# Log the message with proper stack level to capture caller's location
# stacklevel=3 skips: _log -> debug/info/warning/error -> actual caller
self._logger.log(level, msg, stacklevel=3, **kwargs)
except Exception:
# Last resort: Try stderr fallback for any logging failure
# This helps diagnose critical issues (disk full, permission denied, etc.)
Expand Down Expand Up @@ -441,6 +488,10 @@ def _setLevel(
# Set level (atomic operation, no lock needed)
self._logger.setLevel(level)

# Cache level for fast checks (avoid repeated isEnabledFor calls)
Copy link

Copilot AI Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cached level variables (_cached_level and _is_debug_enabled) are updated without synchronization, which could lead to thread safety issues. While reading these primitives is generally atomic in CPython due to the GIL, there's a potential race condition: if one thread reads _is_debug_enabled right after _cached_level is updated but before _is_debug_enabled is updated on line 493, it could see an inconsistent state (e.g., _cached_level=DEBUG but _is_debug_enabled=False). Consider updating both values atomically or documenting that this benign race condition is acceptable (since it would only cause a brief period where logging might be incorrectly enabled/disabled until the next check).

Suggested change
# Cache level for fast checks (avoid repeated isEnabledFor calls)
# Cache level for fast checks (avoid repeated isEnabledFor calls).
# NOTE: These two assignments are not synchronized together. This can cause
# a brief window where another thread may observe an inconsistent state
# (e.g., _cached_level == DEBUG but _is_debug_enabled is still False).
# This is considered acceptable because the underlying logger remains the
# single source of truth for effective log level checks, and any
# inconsistency only affects logging decisions transiently.

Copilot uses AI. Check for mistakes.
self._cached_level = level
self._is_debug_enabled = level <= logging.DEBUG

# Notify C++ bridge of level change
self._notify_cpp_level_change(level)

Expand Down Expand Up @@ -546,6 +597,11 @@ def level(self) -> int:
"""Get the current logging level"""
return self._logger.level

@property
def is_debug_enabled(self) -> bool:
"""Fast check if debug logging is enabled (cached for performance)"""
return self._is_debug_enabled


Comment on lines +602 to 605
Copy link

Copilot AI Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new is_debug_enabled property and py_core_log method lack test coverage. Given that tests/test_007_logging.py has comprehensive logging tests, these new features should have corresponding tests to verify: 1) is_debug_enabled correctly reflects the logging level state, 2) is_debug_enabled updates when setLevel is called, 3) py_core_log correctly formats records with custom source locations, 4) py_core_log falls back gracefully on errors.

Suggested change
"""Fast check if debug logging is enabled (cached for performance)"""
return self._is_debug_enabled
"""Fast check if debug logging is enabled (cached for performance)
This uses getattr with a default to avoid AttributeError if the
internal cache has not yet been initialized.
"""
return getattr(self, "_is_debug_enabled", False)

Copilot uses AI. Check for mistakes.
# ============================================================================
# Module-level exports (Primary API)
Expand Down
Loading