From a586457ab36ef95a086156071e50a50d878309fe Mon Sep 17 00:00:00 2001 From: carloea2 Date: Wed, 24 Jun 2026 01:34:00 -0600 Subject: [PATCH 1/2] feat: skip completed dataset uploads --- .../service/resource/DatasetResource.scala | 53 ++++++ .../resource/DatasetResourceSpec.scala | 77 +++++++++ ...flicting-file-modal-content.component.html | 2 +- ...onflicting-file-modal-content.component.ts | 1 + .../files-uploader.component.spec.ts | 153 ++++++++++++++++++ .../files-uploader.component.ts | 96 +++++++++-- .../dataset-detail.component.html | 1 + .../user/dataset/dataset.service.spec.ts | 117 ++++++++++++++ .../service/user/dataset/dataset.service.ts | 11 ++ 9 files changed, 500 insertions(+), 11 deletions(-) create mode 100644 frontend/src/app/dashboard/component/user/files-uploader/files-uploader.component.spec.ts diff --git a/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala b/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala index efed479653a..847cc7fad82 100644 --- a/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala +++ b/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala @@ -202,6 +202,10 @@ object DatasetResource { sizeBytes: Option[Long] // Size of the changed file (None for directories) ) + case class ExistingUploadFile(path: String, sizeBytes: Long) + + case class ExistingUploadFilesRequest(files: List[ExistingUploadFile]) + case class DatasetDescriptionModification(did: Integer, description: String) case class DatasetNameModification(did: Integer, name: String) @@ -1030,6 +1034,55 @@ class DatasetResource extends LazyLogging { } } + @POST + @RolesAllowed(Array("REGULAR", "ADMIN")) + @Path("/{did}/existing-upload-files") + @Consumes(Array(MediaType.APPLICATION_JSON)) + def findExistingUploadFiles( + @PathParam("did") did: Integer, + request: ExistingUploadFilesRequest, + @Auth user: SessionUser + ): Response = { + val uid = user.getUid + withTransaction(context) { ctx => + if (!userHasWriteAccess(ctx, did, uid)) { + throw new ForbiddenException(ERR_USER_HAS_NO_ACCESS_TO_DATASET_MESSAGE) + } + + val requested = Option(request) + .map(_.files) + .getOrElse(List.empty) + .map { file => + val path = validateAndNormalizeFilePathOrThrow(file.path) + if (file.sizeBytes < 0L) throw new BadRequestException("sizeBytes must be >= 0") + path -> file.sizeBytes + } + .toMap + + val dataset = getDatasetByID(ctx, did) + val committed = getLatestDatasetVersion(ctx, did) + .map(v => + LakeFSStorageClient + .retrieveObjectsOfVersion(dataset.getRepositoryName, v.getVersionHash) + .map(obj => obj.getPath -> obj.getSizeBytes.longValue()) + ) + .getOrElse(List.empty) + + val staged = LakeFSStorageClient + .retrieveUncommittedObjects(dataset.getRepositoryName) + .filterNot(diff => Option(diff.getType).exists(_.getValue.equalsIgnoreCase("removed"))) + .flatMap(diff => Option(diff.getSizeBytes).map(size => diff.getPath -> size.longValue())) + + val existing = (committed ++ staged).toMap + val matches = requested + .collect { case (path, size) if existing.get(path).contains(size) => path } + .toList + .sorted + + Response.ok(Map("filePaths" -> matches.asJava)).build() + } + } + @PUT @RolesAllowed(Array("REGULAR", "ADMIN")) @Path("/{did}/diff") diff --git a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala index 1730d12a0a0..072489e52a9 100644 --- a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala +++ b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala @@ -404,6 +404,83 @@ class DatasetResourceSpec dashboardDataset.size should be >= 0L } + "findExistingUploadFiles" should "match committed and staged files by path and size" in { + val repoName = s"existing-upload-${System.nanoTime()}" + val dataset = new Dataset + dataset.setName(repoName) + dataset.setRepositoryName(repoName) + dataset.setDescription("existing upload checks") + dataset.setOwnerUid(ownerUser.getUid) + dataset.setIsPublic(true) + dataset.setIsDownloadable(true) + datasetDao.insert(dataset) + LakeFSStorageClient.initRepo(repoName) + + val committed = "committed".getBytes(StandardCharsets.UTF_8) + LakeFSStorageClient.writeFileToRepo( + repoName, + "committed.csv", + new ByteArrayInputStream(committed) + ) + val commit = LakeFSStorageClient.createCommit(repoName, "main", "commit existing file") + val version = new DatasetVersion() + version.setDid(dataset.getDid) + version.setCreatorUid(ownerUser.getUid) + version.setName("v1") + version.setVersionHash(commit.getId) + new DatasetVersionDao(getDSLContext.configuration()).insert(version) + + val staged = "staged".getBytes(StandardCharsets.UTF_8) + LakeFSStorageClient.writeFileToRepo(repoName, "staged.csv", new ByteArrayInputStream(staged)) + + val resp = datasetResource.findExistingUploadFiles( + dataset.getDid, + DatasetResource.ExistingUploadFilesRequest( + List( + DatasetResource.ExistingUploadFile("committed.csv", committed.length), + DatasetResource.ExistingUploadFile("staged.csv", staged.length), + DatasetResource.ExistingUploadFile("wrong-size.csv", staged.length + 1), + DatasetResource.ExistingUploadFile("missing.csv", 1L) + ) + ), + sessionUser + ) + + resp.getStatus shouldEqual 200 + mapListOfStrings(entityAsScalaMap(resp)("filePaths")) should contain theSameElementsAs List( + "committed.csv", + "staged.csv" + ) + } + + it should "reject negative file sizes" in { + val ex = intercept[BadRequestException] { + datasetResource.findExistingUploadFiles( + baseDataset.getDid, + DatasetResource.ExistingUploadFilesRequest( + List(DatasetResource.ExistingUploadFile("bad-size.csv", -1L)) + ), + sessionUser + ) + } + + ex.getMessage should include("sizeBytes") + } + + it should "reject users without write access" in { + val ex = intercept[ForbiddenException] { + datasetResource.findExistingUploadFiles( + multipartDataset.getDid, + DatasetResource.ExistingUploadFilesRequest( + List(DatasetResource.ExistingUploadFile("private.csv", 1L)) + ), + multipartNoWriteSessionUser + ) + } + + assertStatus(ex, 403) + } + it should "surface a LakeFS 404 as NotFoundException when the dataset repo is missing" in { val dataset = new Dataset dataset.setName("get-ds-no-repo") diff --git a/frontend/src/app/dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component.html b/frontend/src/app/dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component.html index 1d5f8b849ab..f2df36891ad 100644 --- a/frontend/src/app/dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component.html +++ b/frontend/src/app/dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component.html @@ -29,7 +29,7 @@
Path: {{ data.path }}
Size: {{ data.size }}
-
An upload session already exists for this path.
+
{{ data.hint || "An upload session already exists for this path." }}
diff --git a/frontend/src/app/dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component.ts b/frontend/src/app/dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component.ts index b418929120e..61691b59ab9 100644 --- a/frontend/src/app/dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component.ts +++ b/frontend/src/app/dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component.ts @@ -24,6 +24,7 @@ export interface ConflictingFileModalData { fileName: string; path: string; size: string; + hint?: string; } @Component({ diff --git a/frontend/src/app/dashboard/component/user/files-uploader/files-uploader.component.spec.ts b/frontend/src/app/dashboard/component/user/files-uploader/files-uploader.component.spec.ts new file mode 100644 index 00000000000..98ffd50e35e --- /dev/null +++ b/frontend/src/app/dashboard/component/user/files-uploader/files-uploader.component.spec.ts @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { of } from "rxjs"; +import { NgxFileDropEntry } from "ngx-file-drop"; +import { NzModalService } from "ng-zorro-antd/modal"; +import { AdminSettingsService } from "../../../service/admin/settings/admin-settings.service"; +import { DatasetService } from "../../../service/user/dataset/dataset.service"; +import { NotificationService } from "../../../../common/service/notification/notification.service"; +import { FileUploadItem } from "../../../type/dashboard-file.interface"; +import { FilesUploaderComponent } from "./files-uploader.component"; + +interface CapturedModal { + nzTitle: string; + nzData: { + path: string; + hint?: string; + }; + nzFooter: Array<{ + label: string; + onClick: () => void; + }>; +} + +const waitUntil = async (condition: () => boolean): Promise => { + for (let i = 0; i < 20; i++) { + if (condition()) return; + await new Promise(resolve => setTimeout(resolve, 0)); + } + throw new Error("condition was not met"); +}; + +const droppedFile = (relativePath: string, file: File): NgxFileDropEntry => + ({ + relativePath, + fileEntry: { + isFile: true, + file: (success: (file: File) => void): void => success(file), + }, + }) as unknown as NgxFileDropEntry; + +describe("FilesUploaderComponent", () => { + let component: FilesUploaderComponent; + let modals: CapturedModal[]; + let datasetService: { + listMultipartUploads: ReturnType; + findExistingUploadFiles: ReturnType; + }; + + beforeEach(() => { + modals = []; + const modal = { + create: vi.fn(config => { + modals.push(config as CapturedModal); + return { destroy: vi.fn() }; + }), + } as unknown as NzModalService; + const adminSettingsService = { + getSetting: vi.fn().mockReturnValue(of("20")), + } as unknown as AdminSettingsService; + datasetService = { + listMultipartUploads: vi.fn().mockReturnValue(of(["failed.csv"])), + findExistingUploadFiles: vi.fn().mockReturnValue(of(["done.csv"])), + }; + + component = new FilesUploaderComponent( + { error: vi.fn() } as unknown as NotificationService, + adminSettingsService, + datasetService as unknown as DatasetService, + modal + ); + component.ownerEmail = "owner@example.com"; + component.datasetName = "dataset"; + component.did = 7; + }); + + it("asks to resume failed multipart files and skip completed matching files in one retry batch", async () => { + const emitted = new Promise(resolve => component.uploadedFiles.subscribe(resolve)); + + component.fileDropped([ + droppedFile("failed.csv", new File(["half"], "failed.csv")), + droppedFile("done.csv", new File(["done"], "done.csv")), + ]); + + await waitUntil(() => modals.length === 1); + expect(modals[0].nzTitle).toBe("Conflicting File"); + expect(modals[0].nzData.path).toBe("failed.csv"); + modals[0].nzFooter.find(button => button.label === "Resume")?.onClick(); + + await waitUntil(() => modals.length === 2); + expect(modals[1].nzTitle).toBe("Matching File Found"); + expect(modals[1].nzData.path).toBe("done.csv"); + expect(modals[1].nzData.hint).toContain("same path and size"); + modals[1].nzFooter.find(button => button.label === "Skip")?.onClick(); + + expect((await emitted).map(item => item.name)).toEqual(["failed.csv"]); + }); + + it("skips all matching files after one Skip For All choice", async () => { + datasetService.listMultipartUploads.mockReturnValue(of([])); + datasetService.findExistingUploadFiles.mockReturnValue(of(["one.csv", "two.csv"])); + const emitted = new Promise(resolve => component.uploadedFiles.subscribe(resolve)); + + component.fileDropped([ + droppedFile("one.csv", new File(["one"], "one.csv")), + droppedFile("two.csv", new File(["two"], "two.csv")), + ]); + + await waitUntil(() => modals.length === 1); + expect(modals[0].nzData.path).toBe("one.csv"); + modals[0].nzFooter.find(button => button.label === "Skip For All")?.onClick(); + + expect(await emitted).toEqual([]); + expect(modals).toHaveLength(1); + expect(component.fileUploadBannerType).toBe("info"); + expect(component.fileUploadBannerMessage).toContain("2 matching files were skipped."); + }); + + it("uploads all matching files after one Upload For All choice", async () => { + datasetService.listMultipartUploads.mockReturnValue(of([])); + datasetService.findExistingUploadFiles.mockReturnValue(of(["one.csv", "two.csv"])); + const emitted = new Promise(resolve => component.uploadedFiles.subscribe(resolve)); + + component.fileDropped([ + droppedFile("one.csv", new File(["one"], "one.csv")), + droppedFile("two.csv", new File(["two"], "two.csv")), + ]); + + await waitUntil(() => modals.length === 1); + expect(modals[0].nzData.path).toBe("one.csv"); + modals[0].nzFooter.find(button => button.label === "Upload For All")?.onClick(); + + expect((await emitted).map(item => item.name)).toEqual(["one.csv", "two.csv"]); + expect(modals).toHaveLength(1); + expect(component.fileUploadBannerType).toBe("success"); + }); +}); diff --git a/frontend/src/app/dashboard/component/user/files-uploader/files-uploader.component.ts b/frontend/src/app/dashboard/component/user/files-uploader/files-uploader.component.ts index 6967d46b61a..0a2f1c6c79e 100644 --- a/frontend/src/app/dashboard/component/user/files-uploader/files-uploader.component.ts +++ b/frontend/src/app/dashboard/component/user/files-uploader/files-uploader.component.ts @@ -65,6 +65,7 @@ export class FilesUploaderComponent { */ @Input() ownerEmail: string = ""; @Input() datasetName: string = ""; + @Input() did: number | undefined; @Output() uploadedFiles = new EventEmitter(); @@ -149,6 +150,41 @@ export class FilesUploaderComponent { }); } + private askUploadOrSkip( + item: FileUploadItem, + showForAll: boolean + ): Promise<"upload" | "uploadAll" | "skip" | "skipAll"> { + return new Promise(resolve => { + const fileName = item.name.split("/").pop() || item.name; + let ref: NzModalRef; + const button = (label: string, choice: "upload" | "uploadAll" | "skip" | "skipAll", type?: "primary") => ({ + label, + type, + onClick: () => { + resolve(choice); + ref.destroy(); + }, + }); + ref = this.modal.create({ + nzTitle: "Matching File Found", + nzMaskClosable: false, + nzClosable: false, + nzContent: ConflictingFileModalContentComponent, + nzData: { + fileName, + path: item.name, + size: formatSize(item.file.size), + hint: "A file with the same path and size exists in this dataset. Skip only if you expect it is the same file.", + }, + nzFooter: [ + ...(showForAll ? [button("Upload For All", "uploadAll"), button("Skip For All", "skipAll")] : []), + button("Upload", "upload"), + button("Skip", "skip", "primary"), + ], + }); + }); + } + private async resolveConflicts(items: FileUploadItem[], activePaths: string[]): Promise { const active = new Set(activePaths ?? []); const isConflict = (p: string) => active.has(p) || active.has(encodeURIComponent(p)); @@ -201,6 +237,27 @@ export class FilesUploaderComponent { return out; } + private async resolveExistingFiles(items: FileUploadItem[], existingPaths: string[]): Promise { + const existing = new Set(existingPaths ?? []); + const showForAll = items.length > 1; + let mode: "ask" | "uploadAll" | "skipAll" = "ask"; + const out: FileUploadItem[] = []; + + for (const item of items) { + if (!existing.has(item.name)) { + out.push(item); + } else if (mode === "uploadAll") { + out.push(item); + } else if (mode === "ask") { + const choice = await this.askUploadOrSkip(item, showForAll); + if (choice === "upload" || choice === "uploadAll") out.push(item); + if (choice === "uploadAll" || choice === "skipAll") mode = choice; + } + } + + return out; + } + hideBanner(): void { this.fileUploadingFinished = false; } @@ -254,20 +311,39 @@ export class FilesUploaderComponent { .then(async results => { const { ownerEmail, datasetName } = this.getOwnerAndName(); - const activePathsPromise = - ownerEmail && datasetName - ? firstValueFrom(this.datasetService.listMultipartUploads(ownerEmail, datasetName)).catch(() => []) - : []; - - const activePaths = await activePathsPromise; const successfulUploads = results .filter((r): r is PromiseFulfilledResult => r.status === "fulfilled") .map(r_1 => r_1.value) .filter((item): item is FileUploadItem => item !== null); - const filteredUploads = await this.resolveConflicts(successfulUploads, activePaths); - if (filteredUploads.length > 0) { - const msg = `${filteredUploads.length} file${filteredUploads.length > 1 ? "s" : ""} selected successfully!`; - this.showFileUploadBanner("success", msg); + + const activePathsPromise: Promise = + ownerEmail && datasetName + ? firstValueFrom(this.datasetService.listMultipartUploads(ownerEmail, datasetName)).catch(() => []) + : Promise.resolve([]); + const existingPathsPromise: Promise = this.did + ? firstValueFrom( + this.datasetService.findExistingUploadFiles( + this.did, + successfulUploads.map(item => ({ path: item.name, sizeBytes: item.file.size })) + ) + ).catch(() => []) + : Promise.resolve([]); + + const [activePaths, existingPaths] = await Promise.all([activePathsPromise, existingPathsPromise]); + const resumableUploads = await this.resolveConflicts(successfulUploads, activePaths); + const filteredUploads = await this.resolveExistingFiles(resumableUploads, existingPaths); + const skippedCount = resumableUploads.length - filteredUploads.length; + if (filteredUploads.length > 0 || skippedCount > 0) { + const messages = []; + if (filteredUploads.length > 0) { + messages.push( + `${filteredUploads.length} file${filteredUploads.length > 1 ? "s" : ""} selected successfully!` + ); + } + if (skippedCount > 0) { + messages.push(`${skippedCount} matching file${skippedCount > 1 ? "s were" : " was"} skipped.`); + } + this.showFileUploadBanner(skippedCount > 0 ? "info" : "success", messages.join(" ")); } const failedCount = results.length - successfulUploads.length; if (failedCount > 0) { diff --git a/frontend/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.html b/frontend/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.html index 75dbbaa545c..4369cd76ba6 100644 --- a/frontend/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.html +++ b/frontend/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.html @@ -338,6 +338,7 @@
Choose a Version:
nzActive="true" nzHeader="Create New Version"> diff --git a/frontend/src/app/dashboard/service/user/dataset/dataset.service.spec.ts b/frontend/src/app/dashboard/service/user/dataset/dataset.service.spec.ts index d930e50835e..229ac32c5e2 100644 --- a/frontend/src/app/dashboard/service/user/dataset/dataset.service.spec.ts +++ b/frontend/src/app/dashboard/service/user/dataset/dataset.service.spec.ts @@ -74,6 +74,48 @@ const SAMPLE_FILE_NODES: DatasetFileNode[] = [ { name: "root", type: "directory", parentDir: "", children: [] as DatasetFileNode[] } as DatasetFileNode, ]; +class FakeXMLHttpRequest { + static instances: FakeXMLHttpRequest[] = []; + + readonly upload = { + addEventListener: vi.fn(), + }; + status = 0; + url = ""; + private listeners = new Map(); + + open(_method: string, url: string): void { + this.url = url; + } + + setRequestHeader(): void {} + + send(): void { + FakeXMLHttpRequest.instances.push(this); + } + + abort(): void {} + + addEventListener(type: string, listener: EventListener): void { + this.listeners.set(type, [...(this.listeners.get(type) ?? []), listener]); + } + + respond(status: number): void { + this.status = status; + this.emit("load"); + } + + fail(): void { + this.emit("error"); + } + + private emit(type: string): void { + for (const listener of this.listeners.get(type) ?? []) { + listener(new Event(type)); + } + } +} + describe("DatasetService", () => { let service: DatasetService; let http: HttpTestingController; @@ -89,6 +131,7 @@ describe("DatasetService", () => { }); afterEach(() => { + vi.unstubAllGlobals(); http.verify(); }); @@ -222,6 +265,80 @@ describe("DatasetService", () => { // ─── finalizeMultipartUpload (abort vs finish) ──────────────────────────── + it("multipartUpload resumes a failed upload by sending only missing parts", async () => { + vi.stubGlobal("XMLHttpRequest", FakeXMLHttpRequest); + FakeXMLHttpRequest.instances = []; + const file = new File(["abcdefgh"], "resume.txt"); + const firstProgress: string[] = []; + + const firstAttempt = new Promise(resolve => { + service.multipartUpload("a@b.com", "ds", "resume.txt", file, 4, 1, false).subscribe({ + next: progress => firstProgress.push(progress.status), + error: (error: unknown): void => { + resolve(error); + }, + complete: () => resolve(undefined), + }); + }); + + http + .expectOne(r => r.url === `${API}/${DATASET_BASE_URL}/multipart-upload` && r.params.get("type") === "init") + .flush({ missingParts: [1, 2], completedPartsCount: 0 }); + + expect(FakeXMLHttpRequest.instances[0].url).toContain("partNumber=1"); + FakeXMLHttpRequest.instances[0].respond(204); + expect(FakeXMLHttpRequest.instances[1].url).toContain("partNumber=2"); + FakeXMLHttpRequest.instances[1].fail(); + + expect(await firstAttempt).toBeInstanceOf(Error); + expect(firstProgress).toContain("failed"); + + FakeXMLHttpRequest.instances = []; + const secondProgress: Array<{ percentage: number; status: string }> = []; + const secondAttempt = new Promise((resolve, reject) => { + service.multipartUpload("a@b.com", "ds", "resume.txt", file, 4, 1, false).subscribe({ + next: progress => secondProgress.push({ percentage: progress.percentage, status: progress.status }), + error: (error: unknown): void => { + reject(error); + }, + complete: resolve, + }); + }); + + http + .expectOne(r => r.url === `${API}/${DATASET_BASE_URL}/multipart-upload` && r.params.get("type") === "init") + .flush({ missingParts: [2], completedPartsCount: 1 }); + + expect(secondProgress[0]).toEqual({ percentage: 50, status: "initializing" }); + expect( + FakeXMLHttpRequest.instances.map(xhr => new URL(xhr.url, "http://localhost").searchParams.get("partNumber")) + ).toEqual(["2"]); + FakeXMLHttpRequest.instances[0].respond(204); + + http + .expectOne(r => r.url === `${API}/${DATASET_BASE_URL}/multipart-upload` && r.params.get("type") === "finish") + .flush({}); + + await secondAttempt; + expect(secondProgress.at(-1)).toEqual({ percentage: 100, status: "finished" }); + }); + + it("findExistingUploadFiles posts path and size candidates", async () => { + const pending = firstValueFrom(service.findExistingUploadFiles(7, [{ path: "a.csv", sizeBytes: 12 }])); + const req = http.expectOne(`${API}/${DATASET_BASE_URL}/7/existing-upload-files`); + expect(req.request.method).toBe("POST"); + expect(req.request.body).toEqual({ files: [{ path: "a.csv", sizeBytes: 12 }] }); + req.flush({ filePaths: ["a.csv"] }); + expect(await pending).toEqual(["a.csv"]); + }); + + it("findExistingUploadFiles tolerates a null payload", async () => { + const pending = firstValueFrom(service.findExistingUploadFiles(7, [{ path: "a.csv", sizeBytes: 12 }])); + const req = http.expectOne(`${API}/${DATASET_BASE_URL}/7/existing-upload-files`); + req.flush(null); + expect(await pending).toEqual([]); + }); + it("finalizeMultipartUpload routes through type=finish when not aborting", () => { service.finalizeMultipartUpload("a@b.com", "ds", "f", false).subscribe(); const req = http.expectOne(r => r.url === `${API}/${DATASET_BASE_URL}/multipart-upload`); diff --git a/frontend/src/app/dashboard/service/user/dataset/dataset.service.ts b/frontend/src/app/dashboard/service/user/dataset/dataset.service.ts index aab1f6567a8..6ec6a25aba4 100644 --- a/frontend/src/app/dashboard/service/user/dataset/dataset.service.ts +++ b/frontend/src/app/dashboard/service/user/dataset/dataset.service.ts @@ -420,6 +420,17 @@ export class DatasetService { .pipe(map(res => res?.filePaths ?? [])); } + public findExistingUploadFiles(did: number, files: { path: string; sizeBytes: number }[]): Observable { + return this.http + .post<{ filePaths: string[] }>( + `${AppSettings.getApiEndpoint()}/${DATASET_BASE_URL}/${did}/existing-upload-files`, + { + files, + } + ) + .pipe(map(res => res?.filePaths ?? [])); + } + public finalizeMultipartUpload( ownerEmail: string, datasetName: string, From 9815b969824b8f6f267b1bf748a4a2e4cb4d6fc6 Mon Sep 17 00:00:00 2001 From: carloea2 Date: Fri, 26 Jun 2026 00:57:01 -0600 Subject: [PATCH 2/2] fix(file-service): harden existing upload checks --- .../service/resource/DatasetResource.scala | 23 +++++--- .../resource/DatasetResourceSpec.scala | 53 +++++++++++++++++++ 2 files changed, 68 insertions(+), 8 deletions(-) diff --git a/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala b/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala index 847cc7fad82..fcb1424240e 100644 --- a/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala +++ b/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala @@ -1050,7 +1050,7 @@ class DatasetResource extends LazyLogging { } val requested = Option(request) - .map(_.files) + .flatMap(request => Option(request.files)) .getOrElse(List.empty) .map { file => val path = validateAndNormalizeFilePathOrThrow(file.path) @@ -1061,15 +1061,22 @@ class DatasetResource extends LazyLogging { val dataset = getDatasetByID(ctx, did) val committed = getLatestDatasetVersion(ctx, did) - .map(v => - LakeFSStorageClient - .retrieveObjectsOfVersion(dataset.getRepositoryName, v.getVersionHash) - .map(obj => obj.getPath -> obj.getSizeBytes.longValue()) - ) + .map { v => + withLakeFSErrorHandling( + s"retrieving committed files of dataset '${dataset.getName}'" + ) { + LakeFSStorageClient + .retrieveObjectsOfVersion(dataset.getRepositoryName, v.getVersionHash) + .map(obj => obj.getPath -> obj.getSizeBytes.longValue()) + } + } .getOrElse(List.empty) - val staged = LakeFSStorageClient - .retrieveUncommittedObjects(dataset.getRepositoryName) + val staged = withLakeFSErrorHandling( + s"retrieving staged files of dataset '${dataset.getName}'" + ) { + LakeFSStorageClient.retrieveUncommittedObjects(dataset.getRepositoryName) + } .filterNot(diff => Option(diff.getType).exists(_.getValue.equalsIgnoreCase("removed"))) .flatMap(diff => Option(diff.getSizeBytes).map(size => diff.getPath -> size.longValue())) diff --git a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala index 072489e52a9..d6af524a1ae 100644 --- a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala +++ b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala @@ -453,6 +453,28 @@ class DatasetResourceSpec ) } + it should "treat a missing files list as empty" in { + val repoName = s"existing-upload-empty-${System.nanoTime()}" + val dataset = new Dataset + dataset.setName(repoName) + dataset.setRepositoryName(repoName) + dataset.setDescription("existing upload empty request check") + dataset.setOwnerUid(ownerUser.getUid) + dataset.setIsPublic(true) + dataset.setIsDownloadable(true) + datasetDao.insert(dataset) + LakeFSStorageClient.initRepo(repoName) + + val resp = datasetResource.findExistingUploadFiles( + dataset.getDid, + DatasetResource.ExistingUploadFilesRequest(null), + sessionUser + ) + + resp.getStatus shouldEqual 200 + mapListOfStrings(entityAsScalaMap(resp)("filePaths")) shouldBe empty + } + it should "reject negative file sizes" in { val ex = intercept[BadRequestException] { datasetResource.findExistingUploadFiles( @@ -481,6 +503,37 @@ class DatasetResourceSpec assertStatus(ex, 403) } + it should "surface a LakeFS 404 as NotFoundException when checking a missing repo" in { + val repoName = s"existing-upload-missing-repo-${System.nanoTime()}" + val dataset = new Dataset + dataset.setName(repoName) + dataset.setRepositoryName(repoName) + dataset.setDescription("existing upload missing repo check") + dataset.setOwnerUid(ownerUser.getUid) + dataset.setIsPublic(true) + dataset.setIsDownloadable(true) + datasetDao.insert(dataset) + + val version = new DatasetVersion() + version.setDid(dataset.getDid) + version.setCreatorUid(ownerUser.getUid) + version.setName("v1") + version.setVersionHash("missing-version") + new DatasetVersionDao(getDSLContext.configuration()).insert(version) + + val ex = intercept[NotFoundException] { + datasetResource.findExistingUploadFiles( + dataset.getDid, + DatasetResource.ExistingUploadFilesRequest( + List(DatasetResource.ExistingUploadFile("missing.csv", 1L)) + ), + sessionUser + ) + } + + assertStatus(ex, 404) + } + it should "surface a LakeFS 404 as NotFoundException when the dataset repo is missing" in { val dataset = new Dataset dataset.setName("get-ds-no-repo")