fix(rdf): ensure Fuseki dataset before indexing and report failures correctly#27630
fix(rdf): ensure Fuseki dataset before indexing and report failures correctly#27630
Conversation
…lures correctly If the Fuseki dataset disappears after server startup (container restart without pod restart, ephemeral FUSEKI_BASE, external admin deletion), the RDF connection silently keeps issuing requests that come back as 404/405, producing partial knowledge graphs. Also, RdfRepository.createOrUpdate swallowed exceptions, so RdfBatchProcessor counted every failure as a success. - Add RdfStorageInterface.ensureStorageReady() with a default no-op; Fuseki implementation first tests the connection with an ASK query (works even when the admin API is disabled), attempts dataset creation if missing, verifies again, and throws a clear IllegalStateException otherwise. - RdfIndexApp.execute() and DistributedRdfIndexExecutor.joinJob() call it before processing, so both the coordinator and participant paths recover transparently from a Fuseki dataset loss between runs. - RdfRepository.createOrUpdate() now rethrows after logging, so the batch processor correctly increments failedCount instead of reporting false successes. Entity-hook callers (RdfUpdater) already wrap in try/catch so API-path behavior is unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
The Java checkstyle failed. Please run You can install the pre-commit hooks with |
There was a problem hiding this comment.
Pull request overview
Improves reliability of the RDF indexing app by proactively verifying RDF storage readiness (especially for Fuseki datasets that can disappear after startup) and by fixing failure reporting so indexing errors aren’t incorrectly counted as successes.
Changes:
- Add a new
ensureStorageReady()hook to the RDF storage interface and implement it for Fuseki to verify/recreate datasets and reload ontology when needed. - Make
RdfRepository.createOrUpdate()rethrow on failures so batch indexing can correctly count failures. - Fail RDF indexing jobs early (both coordinator and distributed participant paths) when RDF storage is not ready, surfacing a clear failure message in job records.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| openmetadata-service/src/main/java/org/openmetadata/service/rdf/storage/RdfStorageInterface.java | Adds ensureStorageReady() default no-op contract for readiness checks. |
| openmetadata-service/src/main/java/org/openmetadata/service/rdf/storage/JenaFusekiStorage.java | Stores connection config fields; implements readiness verification + best-effort dataset creation + ontology reload. |
| openmetadata-service/src/main/java/org/openmetadata/service/rdf/RdfRepository.java | Adds repository-level ensureStorageReady() and rethrows from createOrUpdate() to propagate failures. |
| openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/rdf/distributed/DistributedRdfIndexExecutor.java | Ensures distributed participants check RDF storage readiness before working. |
| openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/rdf/RdfIndexApp.java | Aborts indexing job early and marks FAILED when RDF storage cannot be made ready. |
Comments suppressed due to low confidence (1)
openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/rdf/distributed/DistributedRdfIndexExecutor.java:136
joinJob()now invokesRdfRepository.getInstance().ensureStorageReady()on the participant path, but there’s no unit test asserting this readiness check happens (and that failures are handled as intended for participants). Please add coverage inDistributedRdfIndexExecutorTest(or participant tests) to verifyensureStorageReady()is called and to validate behavior when it throws.
public void joinJob(RdfIndexJob job, EventPublisherJob jobConfiguration)
throws InterruptedException {
RdfRepository.getInstance().ensureStorageReady();
currentJob = job;
coordinatorOwnedJob = false;
stopped.set(false);
localExecutionCleaned.set(false);
runWorkers(jobConfiguration, false);
| LOG.error( | ||
| "Failed to create/update entity {} in RDF - Type: {}, FQN: {}", | ||
| entity.getId(), | ||
| entity.getEntityReference().getType(), | ||
| entity.getFullyQualifiedName(), |
There was a problem hiding this comment.
createOrUpdate now logs an error and then rethrows, but the two current callers (RdfBatchProcessor and RdfUpdater) also log on catch. This will produce duplicate error logs (and stack traces) per failure, which can be very noisy during large reindex runs. Consider logging only at one layer (e.g., remove/downgrade this LOG.error here and let callers log with entity context), or rethrow without logging and rely on upstream handling.
| public void ensureStorageReady() { | ||
| if (testConnection()) { | ||
| LOG.debug("Fuseki dataset at {} is accessible", endpoint); | ||
| return; | ||
| } | ||
|
|
||
| LOG.warn( | ||
| "Fuseki dataset at {} is not accessible; attempting to (re)create it before running", | ||
| endpoint); | ||
| ensureDatasetExists(endpoint, username, password); | ||
|
|
||
| if (!testConnection()) { |
There was a problem hiding this comment.
ensureStorageReady() calls testConnection() twice on the unhappy path. In JenaFusekiStorage, testConnection() currently logs at ERROR with a full stack trace for expected transient/unready states, so a single readiness check can emit multiple ERROR stack traces before the method throws its own IllegalStateException. Consider reducing testConnection() logging to DEBUG/INFO (or only logging in ensureStorageReady()), so readiness probes don’t spam logs during failures/retries.
| try { | ||
| rdfRepository.ensureStorageReady(); | ||
| } catch (Exception e) { | ||
| LOG.error("RDF storage is not ready; aborting indexing job", e); | ||
| updateJobStatus(EventPublisherJob.Status.FAILED); | ||
| jobData.setFailure( | ||
| new IndexingError() | ||
| .withErrorSource(IndexingError.ErrorSource.JOB) | ||
| .withMessage("RDF storage is not ready: " + e.getMessage())); | ||
| sendUpdates(jobExecutionContext, true); | ||
| return; | ||
| } |
There was a problem hiding this comment.
New failure behavior was added here (abort the job and mark it FAILED when rdfRepository.ensureStorageReady() throws), but RdfIndexAppTest doesn’t appear to cover it. Please add a unit test that stubs ensureStorageReady() to throw and asserts: status becomes FAILED, jobData.failure.message is set, and updates are sent (or updateRecordToDbAndNotify invoked) so the failure is surfaced to users.
Code Review ✅ ApprovedEnsures the Fuseki dataset exists prior to indexing and improves error reporting for failed operations. No issues found. OptionsDisplay: compact → Showing less information. Comment with these commands to change:
Was this helpful? React with 👍 / 👎 | Gitar |
|
🔴 Playwright Results — 1 failure(s), 14 flaky✅ 3696 passed · ❌ 1 failed · 🟡 14 flaky · ⏭️ 89 skipped
Genuine Failures (failed on all attempts)❌
|



Summary
Two related reliability fixes for the RDF indexing app, discovered debugging a customer instance where the RDF app logged 404/405 errors for every entity yet reported them all as successes.
Issue 1 — silent data loss when the Fuseki dataset disappears.
JenaFusekiStorage.ensureDatasetExists()runs only in the constructor (OM server startup). If the Fuseki dataset is removed later — e.g. Fuseki container restart without pod restart whenFUSEKI_BASEis on an ephemeral filesystem, admin-API deletion by another process — the RDF connection silently keeps issuing SPARQL calls that come back 404 (query) / 405 (update). The indexing app keeps running and logs thousands of errors, but never recovers.Issue 2 — false success counts.
RdfRepository.createOrUpdate()caught all exceptions and only logged.RdfBatchProcessor.processEntities()wraps the call intry { ... successCount++; } catch { failedCount++; }, but becausecreateOrUpdatenever threw, every failure was counted as a success. App reportedfailedRecords: 0with thousands of server-log errors.Changes
RdfStorageInterface: newdefault void ensureStorageReady()(no-op for non-Fuseki implementations).JenaFusekiStorage: stores endpoint/username/password as fields; implementsensureStorageReady()by first running a SPARQLASK(testConnection()) — this works even when the Fuseki admin API is disabled — then, on failure, attemptingensureDatasetExists(...)and re-verifying. ThrowsIllegalStateExceptionwith a clear message pointing at endpoint/credentials/permissions if the dataset still isn't reachable. Also reloads the ontology when we had to (re)create the dataset.RdfRepository: newensureStorageReady()that delegates to the storage.createOrUpdate()now rethrows after logging so callers can count failures.RdfIndexApp.execute(): callsrdfRepository.ensureStorageReady()right after theisEnabled()check; on failure, marks the job FAILED with a clear error message and returns, instead of running to completion with partial data.DistributedRdfIndexExecutor.joinJob(): same call on the participant path, so distributed participants also recover between runs (and fail loudly if they can't).RdfUpdater(the REST-API-path caller) already wrapscreateOrUpdatein try/catch, so user-facing API calls are not affected by the rethrow change.Not fixed here
This is app-level idempotency; it does not fix the underlying Fuseki persistence issue that made the dataset disappear in the first place (FUSEKI_BASE/admin-created datasets need to be on a PVC, and the Fuseki container should not be OOMKilled). Those are infra-side fixes that belong outside this PR.
Test plan
RDF storage is not ready: …message surfaced in the app run record, instead of silently reporting success.openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/RdfResourceIT.javacontinue to pass (defaultensureStorageReady()no-op doesn't affect in-memory/test-mode paths; Fuseki path only throws on genuine unreachability).storeEntity, the batch processor now correctly reportsfailedRecords > 0in the app run stats instead of0.