[DCP - Ingestion] Update the Preprocessing job to support the importName #515
[DCP - Ingestion] Update the Preprocessing job to support the importName #515gmechali wants to merge 1 commit into
Conversation
Not up to standards ⛔🔴 Issues
|
| Category | Results |
|---|---|
| UnusedCode | 1 medium |
| ErrorProne | 1 high |
| CodeStyle | 10 minor |
🟢 Metrics 4 complexity · 0 duplication
Metric Results Complexity 4 Duplication 0
NEW Get contextual insights on your PRs based on Codacy's metrics, along with PR and Jira context, without leaving GitHub. Enable AI reviewer
TIP This summary will be updated as you push new changes.
There was a problem hiding this comment.
Code Review
This pull request adds support for bulk loading and processing multiple imports under subdirectories, merging their configurations, and exporting JSON-LD observations grouped by provenance. The review feedback highlights several opportunities to improve robustness, including using fspath.relative for safe path computation, validating that parsed configurations are dictionaries, avoiding an UnboundLocalError when opening import directories, skipping empty import names, using regex for safer directory name sanitization, and logging warnings for empty provenances or conflicting configuration merges.
| raise e | ||
|
|
||
| dir_path = fspath.dirname(file.path) | ||
| rel_dir = dir_path.replace(base_dir.path, "").strip("/") |
There was a problem hiding this comment.
Using str.replace to remove the base directory path from dir_path is highly error-prone. If the base directory name (e.g., base or import) appears anywhere else in the subdirectory path (for example, /base/import/my_import/base/config.json), replace will remove all occurrences of that substring, corrupting the relative path. Use fs.path.relative to safely compute the relative path.
| rel_dir = dir_path.replace(base_dir.path, "").strip("/") | |
| rel_dir = fspath.relative(base_dir.path, dir_path).strip("/") |
| rows = self.db.engine.fetch_all("SELECT DISTINCT provenance FROM observations") | ||
| provenances = [row[0] for row in rows if row[0]] |
There was a problem hiding this comment.
If any observations have a missing or empty provenance field, they will be completely skipped during the export because provenances only includes non-empty values. To prevent silent data loss, we should log a warning if any observations with an empty provenance are detected.
| rows = self.db.engine.fetch_all("SELECT DISTINCT provenance FROM observations") | |
| provenances = [row[0] for row in rows if row[0]] | |
| rows = self.db.engine.fetch_all("SELECT DISTINCT provenance FROM observations") | |
| provenances = [row[0] for row in rows if row[0]] | |
| if any(not row[0] for row in rows): | |
| logging.warning("Found observations with missing or empty provenance. These will be skipped during export!") |
| try: | ||
| config_data = json.loads(raw_config) | ||
| except json.JSONDecodeError as e: | ||
| logging.error("Failed to parse JSON from %s: %s", file.full_path(), e) | ||
| raise e |
There was a problem hiding this comment.
If a config.json file contains valid JSON but is not a JSON object (e.g., a JSON array or a primitive value), json.loads will succeed but subsequent .get() calls on config_data will raise an AttributeError. We should defensively verify that the parsed JSON is a dictionary.
| try: | |
| config_data = json.loads(raw_config) | |
| except json.JSONDecodeError as e: | |
| logging.error("Failed to parse JSON from %s: %s", file.full_path(), e) | |
| raise e | |
| try: | |
| config_data = json.loads(raw_config) | |
| if not isinstance(config_data, dict): | |
| raise ValueError("Config content must be a JSON object") | |
| except (json.JSONDecodeError, ValueError) as e: | |
| logging.error("Failed to parse JSON from %s: %s", file.full_path(), e) | |
| raise e |
| for name in import_names: | ||
| name = name.strip() |
There was a problem hiding this comment.
If import_names contains empty strings or trailing commas (e.g., "oecd,"), name.strip() will be empty. Calling base_dir.open_dir("") on an empty string will open the base directory itself, which is unintended. We should skip empty names.
for name in import_names:
name = name.strip()
if not name:
continue| try: | ||
| imp_dir = base_dir.open_dir(name) | ||
| file = imp_dir.open_file(constants.CONFIG_JSON_FILE_NAME, create_if_missing=False) | ||
| configs.append(file) | ||
| except FileNotFoundError: | ||
| logging.info("Config file not found at root of %s. Scanning subdirectories.", name) | ||
| sub_configs = self._find_configs_in_dir(imp_dir) | ||
| if not sub_configs: | ||
| raise FileNotFoundError(f"No config files found for {name}") | ||
| configs.extend(sub_configs) | ||
| except ValueError as e: | ||
| logging.error("Invalid directory for import %s: %s", name, e) | ||
| raise e |
There was a problem hiding this comment.
If base_dir.open_dir(name) fails or raises an exception before imp_dir is assigned, referencing imp_dir in the except FileNotFoundError block will raise an UnboundLocalError. We should separate the directory opening from the file opening to ensure imp_dir is safely bound before it is used.
| try: | |
| imp_dir = base_dir.open_dir(name) | |
| file = imp_dir.open_file(constants.CONFIG_JSON_FILE_NAME, create_if_missing=False) | |
| configs.append(file) | |
| except FileNotFoundError: | |
| logging.info("Config file not found at root of %s. Scanning subdirectories.", name) | |
| sub_configs = self._find_configs_in_dir(imp_dir) | |
| if not sub_configs: | |
| raise FileNotFoundError(f"No config files found for {name}") | |
| configs.extend(sub_configs) | |
| except ValueError as e: | |
| logging.error("Invalid directory for import %s: %s", name, e) | |
| raise e | |
| try: | |
| imp_dir = base_dir.open_dir(name) | |
| except (FileNotFoundError, ValueError) as e: | |
| logging.error("Invalid or missing directory for import %s: %s", name, e) | |
| raise e | |
| try: | |
| file = imp_dir.open_file(constants.CONFIG_JSON_FILE_NAME, create_if_missing=False) | |
| configs.append(file) | |
| except FileNotFoundError: | |
| logging.info("Config file not found at root of %s. Scanning subdirectories.", name) | |
| sub_configs = self._find_configs_in_dir(imp_dir) | |
| if not sub_configs: | |
| raise FileNotFoundError(f"No config files found for {name}") | |
| configs.extend(sub_configs) |
| # Sanitize provenance name for folder | ||
| prov_folder = prov.replace("/", "_").replace(":", "_") |
There was a problem hiding this comment.
Sanitizing the provenance name by only replacing / and : is not fully robust. Provenance strings (especially URLs) can contain other characters like ?, &, =, or \, which are invalid or problematic in directory names on various filesystems (e.g., Windows). Using a regular expression to replace all non-alphanumeric characters (except underscores and dashes) is much safer and more robust.
| # Sanitize provenance name for folder | |
| prov_folder = prov.replace("/", "_").replace(":", "_") | |
| # Sanitize provenance name for folder to be safe for all filesystems | |
| import re | |
| prov_folder = re.sub(r'[^a-zA-Z0-9_\-]', '_', prov) |
| # Merge variables | ||
| variables = config_data.get("variables", {}) | ||
| for k, v in variables.items(): | ||
| merged_data["variables"][k] = v | ||
|
|
||
| # Merge sources | ||
| sources = config_data.get("sources", {}) | ||
| for k, v in sources.items(): | ||
| merged_data["sources"][k] = v |
There was a problem hiding this comment.
When merging variables and sources from multiple configuration files, conflicting definitions for the same key will silently overwrite each other. To prevent hard-to-debug configuration conflicts, we should log a warning if a key is being overwritten with a different value.
| # Merge variables | |
| variables = config_data.get("variables", {}) | |
| for k, v in variables.items(): | |
| merged_data["variables"][k] = v | |
| # Merge sources | |
| sources = config_data.get("sources", {}) | |
| for k, v in sources.items(): | |
| merged_data["sources"][k] = v | |
| # Merge variables, warning on conflicts | |
| variables = config_data.get("variables", {}) | |
| for k, v in variables.items(): | |
| if k in merged_data["variables"] and merged_data["variables"][k] != v: | |
| logging.warning("Conflicting definition for variable %s. Overwriting.", k) | |
| merged_data["variables"][k] = v | |
| # Merge sources, warning on conflicts | |
| sources = config_data.get("sources", {}) | |
| for k, v in sources.items(): | |
| if k in merged_data["sources"] and merged_data["sources"][k] != v: | |
| logging.warning("Conflicting definition for source %s. Overwriting.", k) | |
| merged_data["sources"][k] = v |
No description provided.