Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ object DatasetResource {
sizeBytes: Option[Long] // Size of the changed file (None for directories)
)

case class ExistingUploadFile(path: String, sizeBytes: Long)

case class ExistingUploadFilesRequest(files: List[ExistingUploadFile])

case class DatasetDescriptionModification(did: Integer, description: String)

case class DatasetNameModification(did: Integer, name: String)
Expand Down Expand Up @@ -1030,6 +1034,62 @@ class DatasetResource extends LazyLogging {
}
}

@POST
@RolesAllowed(Array("REGULAR", "ADMIN"))
@Path("/{did}/existing-upload-files")
@Consumes(Array(MediaType.APPLICATION_JSON))
def findExistingUploadFiles(
@PathParam("did") did: Integer,
request: ExistingUploadFilesRequest,
@Auth user: SessionUser
): Response = {
val uid = user.getUid
withTransaction(context) { ctx =>
if (!userHasWriteAccess(ctx, did, uid)) {
throw new ForbiddenException(ERR_USER_HAS_NO_ACCESS_TO_DATASET_MESSAGE)
}

val requested = Option(request)
.flatMap(request => Option(request.files))
.getOrElse(List.empty)
.map { file =>
val path = validateAndNormalizeFilePathOrThrow(file.path)
if (file.sizeBytes < 0L) throw new BadRequestException("sizeBytes must be >= 0")
path -> file.sizeBytes
}
.toMap

val dataset = getDatasetByID(ctx, did)
val committed = getLatestDatasetVersion(ctx, did)
.map { v =>
withLakeFSErrorHandling(
s"retrieving committed files of dataset '${dataset.getName}'"
) {
LakeFSStorageClient
.retrieveObjectsOfVersion(dataset.getRepositoryName, v.getVersionHash)
.map(obj => obj.getPath -> obj.getSizeBytes.longValue())
}
}
.getOrElse(List.empty)

val staged = withLakeFSErrorHandling(
s"retrieving staged files of dataset '${dataset.getName}'"
) {
LakeFSStorageClient.retrieveUncommittedObjects(dataset.getRepositoryName)
}
.filterNot(diff => Option(diff.getType).exists(_.getValue.equalsIgnoreCase("removed")))
.flatMap(diff => Option(diff.getSizeBytes).map(size => diff.getPath -> size.longValue()))

val existing = (committed ++ staged).toMap
val matches = requested
.collect { case (path, size) if existing.get(path).contains(size) => path }
.toList
.sorted

Response.ok(Map("filePaths" -> matches.asJava)).build()
}
}

@PUT
@RolesAllowed(Array("REGULAR", "ADMIN"))
@Path("/{did}/diff")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,136 @@ class DatasetResourceSpec
dashboardDataset.size should be >= 0L
}

"findExistingUploadFiles" should "match committed and staged files by path and size" in {
val repoName = s"existing-upload-${System.nanoTime()}"
val dataset = new Dataset
dataset.setName(repoName)
dataset.setRepositoryName(repoName)
dataset.setDescription("existing upload checks")
dataset.setOwnerUid(ownerUser.getUid)
dataset.setIsPublic(true)
dataset.setIsDownloadable(true)
datasetDao.insert(dataset)
LakeFSStorageClient.initRepo(repoName)

val committed = "committed".getBytes(StandardCharsets.UTF_8)
LakeFSStorageClient.writeFileToRepo(
repoName,
"committed.csv",
new ByteArrayInputStream(committed)
)
val commit = LakeFSStorageClient.createCommit(repoName, "main", "commit existing file")
val version = new DatasetVersion()
version.setDid(dataset.getDid)
version.setCreatorUid(ownerUser.getUid)
version.setName("v1")
version.setVersionHash(commit.getId)
new DatasetVersionDao(getDSLContext.configuration()).insert(version)

val staged = "staged".getBytes(StandardCharsets.UTF_8)
LakeFSStorageClient.writeFileToRepo(repoName, "staged.csv", new ByteArrayInputStream(staged))

val resp = datasetResource.findExistingUploadFiles(
dataset.getDid,
DatasetResource.ExistingUploadFilesRequest(
List(
DatasetResource.ExistingUploadFile("committed.csv", committed.length),
DatasetResource.ExistingUploadFile("staged.csv", staged.length),
DatasetResource.ExistingUploadFile("wrong-size.csv", staged.length + 1),
DatasetResource.ExistingUploadFile("missing.csv", 1L)
)
),
sessionUser
)

resp.getStatus shouldEqual 200
mapListOfStrings(entityAsScalaMap(resp)("filePaths")) should contain theSameElementsAs List(
"committed.csv",
"staged.csv"
)
}

it should "treat a missing files list as empty" in {
val repoName = s"existing-upload-empty-${System.nanoTime()}"
val dataset = new Dataset
dataset.setName(repoName)
dataset.setRepositoryName(repoName)
dataset.setDescription("existing upload empty request check")
dataset.setOwnerUid(ownerUser.getUid)
dataset.setIsPublic(true)
dataset.setIsDownloadable(true)
datasetDao.insert(dataset)
LakeFSStorageClient.initRepo(repoName)

val resp = datasetResource.findExistingUploadFiles(
dataset.getDid,
DatasetResource.ExistingUploadFilesRequest(null),
sessionUser
)

resp.getStatus shouldEqual 200
mapListOfStrings(entityAsScalaMap(resp)("filePaths")) shouldBe empty
}

it should "reject negative file sizes" in {
val ex = intercept[BadRequestException] {
datasetResource.findExistingUploadFiles(
baseDataset.getDid,
DatasetResource.ExistingUploadFilesRequest(
List(DatasetResource.ExistingUploadFile("bad-size.csv", -1L))
),
sessionUser
)
}

ex.getMessage should include("sizeBytes")
}

it should "reject users without write access" in {
val ex = intercept[ForbiddenException] {
datasetResource.findExistingUploadFiles(
multipartDataset.getDid,
DatasetResource.ExistingUploadFilesRequest(
List(DatasetResource.ExistingUploadFile("private.csv", 1L))
),
multipartNoWriteSessionUser
)
}

assertStatus(ex, 403)
}

it should "surface a LakeFS 404 as NotFoundException when checking a missing repo" in {
val repoName = s"existing-upload-missing-repo-${System.nanoTime()}"
val dataset = new Dataset
dataset.setName(repoName)
dataset.setRepositoryName(repoName)
dataset.setDescription("existing upload missing repo check")
dataset.setOwnerUid(ownerUser.getUid)
dataset.setIsPublic(true)
dataset.setIsDownloadable(true)
datasetDao.insert(dataset)

val version = new DatasetVersion()
version.setDid(dataset.getDid)
version.setCreatorUid(ownerUser.getUid)
version.setName("v1")
version.setVersionHash("missing-version")
new DatasetVersionDao(getDSLContext.configuration()).insert(version)

val ex = intercept[NotFoundException] {
datasetResource.findExistingUploadFiles(
dataset.getDid,
DatasetResource.ExistingUploadFilesRequest(
List(DatasetResource.ExistingUploadFile("missing.csv", 1L))
),
sessionUser
)
}

assertStatus(ex, 404)
}

it should "surface a LakeFS 404 as NotFoundException when the dataset repo is missing" in {
val dataset = new Dataset
dataset.setName("get-ds-no-repo")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<div><b>Path:</b> {{ data.path }}</div>
<div><b>Size:</b> {{ data.size }}</div>

<div class="hint">An upload session already exists for this path.</div>
<div class="hint">{{ data.hint || "An upload session already exists for this path." }}</div>
</div>
</body>
</html>
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export interface ConflictingFileModalData {
fileName: string;
path: string;
size: string;
hint?: string;
}

@Component({
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { of } from "rxjs";
import { NgxFileDropEntry } from "ngx-file-drop";
import { NzModalService } from "ng-zorro-antd/modal";
import { AdminSettingsService } from "../../../service/admin/settings/admin-settings.service";
import { DatasetService } from "../../../service/user/dataset/dataset.service";
import { NotificationService } from "../../../../common/service/notification/notification.service";
import { FileUploadItem } from "../../../type/dashboard-file.interface";
import { FilesUploaderComponent } from "./files-uploader.component";

interface CapturedModal {
nzTitle: string;
nzData: {
path: string;
hint?: string;
};
nzFooter: Array<{
label: string;
onClick: () => void;
}>;
}

const waitUntil = async (condition: () => boolean): Promise<void> => {
for (let i = 0; i < 20; i++) {
if (condition()) return;
await new Promise(resolve => setTimeout(resolve, 0));
}
throw new Error("condition was not met");
};

const droppedFile = (relativePath: string, file: File): NgxFileDropEntry =>
({
relativePath,
fileEntry: {
isFile: true,
file: (success: (file: File) => void): void => success(file),
},
}) as unknown as NgxFileDropEntry;

describe("FilesUploaderComponent", () => {
let component: FilesUploaderComponent;
let modals: CapturedModal[];
let datasetService: {
listMultipartUploads: ReturnType<typeof vi.fn>;
findExistingUploadFiles: ReturnType<typeof vi.fn>;
};

beforeEach(() => {
modals = [];
const modal = {
create: vi.fn(config => {
modals.push(config as CapturedModal);
return { destroy: vi.fn() };
}),
} as unknown as NzModalService;
const adminSettingsService = {
getSetting: vi.fn().mockReturnValue(of("20")),
} as unknown as AdminSettingsService;
datasetService = {
listMultipartUploads: vi.fn().mockReturnValue(of(["failed.csv"])),
findExistingUploadFiles: vi.fn().mockReturnValue(of(["done.csv"])),
};

component = new FilesUploaderComponent(
{ error: vi.fn() } as unknown as NotificationService,
adminSettingsService,
datasetService as unknown as DatasetService,
modal
);
component.ownerEmail = "owner@example.com";
component.datasetName = "dataset";
component.did = 7;
});

it("asks to resume failed multipart files and skip completed matching files in one retry batch", async () => {
const emitted = new Promise<FileUploadItem[]>(resolve => component.uploadedFiles.subscribe(resolve));

component.fileDropped([
droppedFile("failed.csv", new File(["half"], "failed.csv")),
droppedFile("done.csv", new File(["done"], "done.csv")),
]);

await waitUntil(() => modals.length === 1);
expect(modals[0].nzTitle).toBe("Conflicting File");
expect(modals[0].nzData.path).toBe("failed.csv");
modals[0].nzFooter.find(button => button.label === "Resume")?.onClick();

await waitUntil(() => modals.length === 2);
expect(modals[1].nzTitle).toBe("Matching File Found");
expect(modals[1].nzData.path).toBe("done.csv");
expect(modals[1].nzData.hint).toContain("same path and size");
modals[1].nzFooter.find(button => button.label === "Skip")?.onClick();

expect((await emitted).map(item => item.name)).toEqual(["failed.csv"]);
});

it("skips all matching files after one Skip For All choice", async () => {
datasetService.listMultipartUploads.mockReturnValue(of([]));
datasetService.findExistingUploadFiles.mockReturnValue(of(["one.csv", "two.csv"]));
const emitted = new Promise<FileUploadItem[]>(resolve => component.uploadedFiles.subscribe(resolve));

component.fileDropped([
droppedFile("one.csv", new File(["one"], "one.csv")),
droppedFile("two.csv", new File(["two"], "two.csv")),
]);

await waitUntil(() => modals.length === 1);
expect(modals[0].nzData.path).toBe("one.csv");
modals[0].nzFooter.find(button => button.label === "Skip For All")?.onClick();

expect(await emitted).toEqual([]);
expect(modals).toHaveLength(1);
expect(component.fileUploadBannerType).toBe("info");
expect(component.fileUploadBannerMessage).toContain("2 matching files were skipped.");
});

it("uploads all matching files after one Upload For All choice", async () => {
datasetService.listMultipartUploads.mockReturnValue(of([]));
datasetService.findExistingUploadFiles.mockReturnValue(of(["one.csv", "two.csv"]));
const emitted = new Promise<FileUploadItem[]>(resolve => component.uploadedFiles.subscribe(resolve));

component.fileDropped([
droppedFile("one.csv", new File(["one"], "one.csv")),
droppedFile("two.csv", new File(["two"], "two.csv")),
]);

await waitUntil(() => modals.length === 1);
expect(modals[0].nzData.path).toBe("one.csv");
modals[0].nzFooter.find(button => button.label === "Upload For All")?.onClick();

expect((await emitted).map(item => item.name)).toEqual(["one.csv", "two.csv"]);
expect(modals).toHaveLength(1);
expect(component.fileUploadBannerType).toBe("success");
});
});
Loading
Loading