diff --git a/mssql_python/cursor.py b/mssql_python/cursor.py index 0e28168e..427ea86d 100644 --- a/mssql_python/cursor.py +++ b/mssql_python/cursor.py @@ -2527,67 +2527,132 @@ def _bulkcopy( ValueError: If table_name is empty or parameters are invalid RuntimeError: If connection string is not available """ + # Fast check if logging is enabled to avoid overhead + is_logging = logger.is_debug_enabled + + if is_logging: + logger.debug( + "_bulkcopy: Starting bulk copy operation - table=%s, batch_size=%d, timeout=%d, " + "keep_identity=%s, check_constraints=%s, table_lock=%s, keep_nulls=%s, " + "fire_triggers=%s, use_internal_transaction=%s", + table_name, + batch_size, + timeout, + keep_identity, + check_constraints, + table_lock, + keep_nulls, + fire_triggers, + use_internal_transaction, + ) + try: import mssql_py_core + + if is_logging: + logger.debug("_bulkcopy: mssql_py_core module imported successfully") except ImportError as exc: + if is_logging: + logger.error("_bulkcopy: Failed to import mssql_py_core module") raise ImportError( "Bulk copy requires the mssql_py_core library which is not installed. " "To install, run: pip install mssql_py_core " ) from exc # Validate inputs + if is_logging: + logger.debug("_bulkcopy: Validating input parameters") if not table_name or not isinstance(table_name, str): + if is_logging: + logger.error("_bulkcopy: Invalid table_name parameter") raise ValueError("table_name must be a non-empty string") # Validate that data is iterable (but not a string or bytes, which are technically iterable) if data is None: + if is_logging: + logger.error("_bulkcopy: data parameter is None") raise TypeError("data must be an iterable of tuples or lists, got None") if isinstance(data, (str, bytes)): + if is_logging: + logger.error( + "_bulkcopy: data parameter is string or bytes, not valid row collection" + ) raise TypeError( f"data must be an iterable of tuples or lists, got {type(data).__name__}. " "Strings and bytes are not valid row collections." ) if not hasattr(data, "__iter__"): + if is_logging: + logger.error("_bulkcopy: data parameter is not iterable") raise TypeError( f"data must be an iterable of tuples or lists, got non-iterable {type(data).__name__}" ) + if is_logging: + logger.debug("_bulkcopy: Data parameter validation successful") # Validate batch_size type and value (0 means server optimal) if not isinstance(batch_size, int): + if is_logging: + logger.error("_bulkcopy: Invalid batch_size type: %s", type(batch_size).__name__) raise TypeError( f"batch_size must be a non-negative integer, got {type(batch_size).__name__}" ) if batch_size < 0: + if is_logging: + logger.error("_bulkcopy: Invalid batch_size value: %d", batch_size) raise ValueError(f"batch_size must be non-negative, got {batch_size}") # Validate timeout type and value if not isinstance(timeout, int): + if is_logging: + logger.error("_bulkcopy: Invalid timeout type: %s", type(timeout).__name__) raise TypeError(f"timeout must be a positive integer, got {type(timeout).__name__}") if timeout <= 0: + if is_logging: + logger.error("_bulkcopy: Invalid timeout value: %d", timeout) raise ValueError(f"timeout must be positive, got {timeout}") # Get and parse connection string + if is_logging: + logger.debug("_bulkcopy: Retrieving connection string") if not hasattr(self.connection, "connection_str"): + if is_logging: + logger.error("_bulkcopy: Connection string not available") raise RuntimeError("Connection string not available for bulk copy") # Use the proper connection string parser that handles braced values from mssql_python.connection_string_parser import _ConnectionStringParser + if is_logging: + logger.debug("_bulkcopy: Parsing connection string") parser = _ConnectionStringParser(validate_keywords=False) params = parser._parse(self.connection.connection_str) if not params.get("server"): + if is_logging: + logger.error("_bulkcopy: SERVER parameter missing in connection string") raise ValueError("SERVER parameter is required in connection string") if not params.get("database"): + if is_logging: + logger.error("_bulkcopy: DATABASE parameter missing in connection string") raise ValueError( "DATABASE parameter is required in connection string for bulk copy. " "Specify the target database explicitly to avoid accidentally writing to system databases." ) + if is_logging: + logger.debug( + "_bulkcopy: Connection parameters - server=%s, database=%s", + params.get("server"), + params.get("database"), + ) + # Build connection context for bulk copy library # Note: Password is extracted separately to avoid storing it in the main context # dict that could be accidentally logged or exposed in error messages. + if is_logging: + logger.debug("_bulkcopy: Building connection context") trust_cert = params.get("trustservercertificate", "yes").lower() in ("yes", "true") # Parse encryption setting from connection string @@ -2604,6 +2669,13 @@ def _bulkcopy( else: encryption = "Optional" + if is_logging: + logger.debug( + "_bulkcopy: Connection security - encryption=%s, trust_cert=%s", + encryption, + trust_cert, + ) + context = { "server": params.get("server"), "database": params.get("database"), @@ -2638,12 +2710,43 @@ def _bulkcopy( pycore_context["user_name"] = params.get("uid", "") pycore_context["password"] = params.get("pwd", "") + # Log column mappings if provided (only when logging is enabled) + if is_logging and column_mappings: + if isinstance(column_mappings, list) and column_mappings: + if isinstance(column_mappings[0], tuple): + logger.debug( + "_bulkcopy: Using advanced column mappings with %d mapping(s)", + len(column_mappings), + ) + else: + logger.debug( + "_bulkcopy: Using simple column mappings with %d column(s)", + len(column_mappings), + ) + elif is_logging: + logger.debug("_bulkcopy: No column mappings provided, using ordinal position mapping") + pycore_connection = None pycore_cursor = None try: - pycore_connection = mssql_py_core.PyCoreConnection(pycore_context) + if is_logging: + logger.debug("_bulkcopy: Creating PyCoreConnection") + # Only pass logger to Rust if logging is enabled (performance optimization) + pycore_connection = mssql_py_core.PyCoreConnection( + pycore_context, python_logger=logger if is_logging else None + ) + if is_logging: + logger.debug("_bulkcopy: PyCoreConnection created successfully") + + logger.debug("_bulkcopy: Creating PyCoreCursor") pycore_cursor = pycore_connection.cursor() + if is_logging: + logger.debug("_bulkcopy: PyCoreCursor created successfully") + # Call bulkcopy with explicit keyword arguments + # The API signature: bulkcopy(table_name, data_source, batch_size=0, timeout=30, ...) + if is_logging: + logger.info("_bulkcopy: Executing bulk copy operation to table '%s'", table_name) result = pycore_cursor.bulkcopy( table_name, iter(data), @@ -2656,8 +2759,17 @@ def _bulkcopy( keep_nulls=keep_nulls, fire_triggers=fire_triggers, use_internal_transaction=use_internal_transaction, + python_logger=logger if is_logging else None, # Only pass logger if enabled ) + if is_logging: + logger.info( + "_bulkcopy: Bulk copy completed successfully - rows_copied=%s, batch_count=%s, elapsed_time=%s", + result.get("rows_copied", "N/A"), + result.get("batch_count", "N/A"), + result.get("elapsed_time", "N/A"), + ) + return result except Exception as e: @@ -2673,6 +2785,8 @@ def _bulkcopy( raise type(e)(str(e)) from None finally: + if is_logging: + logger.debug("_bulkcopy: Starting cleanup") # Clear sensitive data to minimize memory exposure if pycore_context: pycore_context.pop("password", None) @@ -2682,15 +2796,20 @@ def _bulkcopy( for resource in (pycore_cursor, pycore_connection): if resource and hasattr(resource, "close"): try: + if is_logging: + logger.debug("_bulkcopy: Closing resource %s", type(resource).__name__) resource.close() except Exception as cleanup_error: # Log cleanup errors at debug level to aid troubleshooting # without masking the original exception - logger.debug( - "Failed to close bulk copy resource %s: %s", - type(resource).__name__, - cleanup_error, - ) + if is_logging: + logger.debug( + "Failed to close bulk copy resource %s: %s", + type(resource).__name__, + cleanup_error, + ) + if is_logging: + logger.debug("_bulkcopy: Cleanup completed") def __enter__(self): """ diff --git a/mssql_python/logging.py b/mssql_python/logging.py index 2cb9361f..b0291d68 100644 --- a/mssql_python/logging.py +++ b/mssql_python/logging.py @@ -104,6 +104,10 @@ def __init__(self): self._handler_lock = threading.RLock() # Reentrant lock for handler operations self._cleanup_registered = False # Track if atexit cleanup is registered + # Cached level for fast checks (avoid repeated isEnabledFor calls) + self._cached_level = logging.CRITICAL + self._is_debug_enabled = False + # Don't setup handlers yet - do it lazily when setLevel is called # This prevents creating log files when user changes output mode before enabling logging @@ -145,15 +149,20 @@ def _setup_handlers(self): # Custom formatter to extract source from message and format as CSV class CSVFormatter(logging.Formatter): def format(self, record): - # Extract source from message (e.g., [Python] or [DDBC]) - msg = record.getMessage() - if msg.startswith("[") and "]" in msg: - end_bracket = msg.index("]") - source = msg[1:end_bracket] - message = msg[end_bracket + 2 :].strip() # Skip '] ' + # Check if this is from py-core (via py_core_log method) + if hasattr(record, "funcName") and record.funcName == "py-core": + source = "py-core" + message = record.getMessage() else: - source = "Unknown" - message = msg + # Extract source from message (e.g., [Python] or [DDBC]) + msg = record.getMessage() + if msg.startswith("[") and "]" in msg: + end_bracket = msg.index("]") + source = msg[1:end_bracket] + message = msg[end_bracket + 2 :].strip() # Skip '] ' + else: + source = "Unknown" + message = msg # Format timestamp with milliseconds using period separator timestamp = self.formatTime(record, "%Y-%m-%d %H:%M:%S") @@ -326,6 +335,42 @@ def _write_log_header(self): pass # Even stderr notification failed # Don't crash - logging continues without header + def py_core_log(self, level: int, msg: str, filename: str = "cursor.rs", lineno: int = 0): + """ + Logging method for py-core (Rust/TDS) code with custom source location. + + Args: + level: Log level (DEBUG, INFO, WARNING, ERROR) + msg: Message string (already formatted) + filename: Source filename (e.g., 'cursor.rs') + lineno: Line number in source file + """ + try: + if not self._logger.isEnabledFor(level): + return + + # Create a custom LogRecord with Rust source location + import logging as log_module + + record = log_module.LogRecord( + name=self._logger.name, + level=level, + pathname=filename, + lineno=lineno, + msg=msg, + args=(), + exc_info=None, + func="py-core", + sinfo=None, + ) + self._logger.handle(record) + except Exception: + # Fallback - use regular logging + try: + self._logger.log(level, msg) + except: + pass + def _log(self, level: int, msg: str, add_prefix: bool = True, *args, **kwargs): """ Internal logging method with exception safety. @@ -352,8 +397,9 @@ def _log(self, level: int, msg: str, add_prefix: bool = True, *args, **kwargs): All other failures are silently ignored to prevent app crashes. """ try: - # Fast level check (zero overhead if disabled) - if not self._logger.isEnabledFor(level): + # Fast level check using cached level (zero overhead if disabled) + # This avoids the overhead of isEnabledFor() method call + if level < self._cached_level: return # Add prefix if requested (only after level check) @@ -364,8 +410,9 @@ def _log(self, level: int, msg: str, add_prefix: bool = True, *args, **kwargs): if args: msg = msg % args - # Log the message (no args since already formatted) - self._logger.log(level, msg, **kwargs) + # Log the message with proper stack level to capture caller's location + # stacklevel=3 skips: _log -> debug/info/warning/error -> actual caller + self._logger.log(level, msg, stacklevel=3, **kwargs) except Exception: # Last resort: Try stderr fallback for any logging failure # This helps diagnose critical issues (disk full, permission denied, etc.) @@ -441,6 +488,10 @@ def _setLevel( # Set level (atomic operation, no lock needed) self._logger.setLevel(level) + # Cache level for fast checks (avoid repeated isEnabledFor calls) + self._cached_level = level + self._is_debug_enabled = level <= logging.DEBUG + # Notify C++ bridge of level change self._notify_cpp_level_change(level) @@ -546,6 +597,11 @@ def level(self) -> int: """Get the current logging level""" return self._logger.level + @property + def is_debug_enabled(self) -> bool: + """Fast check if debug logging is enabled (cached for performance)""" + return self._is_debug_enabled + # ============================================================================ # Module-level exports (Primary API)