diff --git a/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala b/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala index efed479653a..fcb1424240e 100644 --- a/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala +++ b/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala @@ -202,6 +202,10 @@ object DatasetResource { sizeBytes: Option[Long] // Size of the changed file (None for directories) ) + case class ExistingUploadFile(path: String, sizeBytes: Long) + + case class ExistingUploadFilesRequest(files: List[ExistingUploadFile]) + case class DatasetDescriptionModification(did: Integer, description: String) case class DatasetNameModification(did: Integer, name: String) @@ -1030,6 +1034,62 @@ class DatasetResource extends LazyLogging { } } + @POST + @RolesAllowed(Array("REGULAR", "ADMIN")) + @Path("/{did}/existing-upload-files") + @Consumes(Array(MediaType.APPLICATION_JSON)) + def findExistingUploadFiles( + @PathParam("did") did: Integer, + request: ExistingUploadFilesRequest, + @Auth user: SessionUser + ): Response = { + val uid = user.getUid + withTransaction(context) { ctx => + if (!userHasWriteAccess(ctx, did, uid)) { + throw new ForbiddenException(ERR_USER_HAS_NO_ACCESS_TO_DATASET_MESSAGE) + } + + val requested = Option(request) + .flatMap(request => Option(request.files)) + .getOrElse(List.empty) + .map { file => + val path = validateAndNormalizeFilePathOrThrow(file.path) + if (file.sizeBytes < 0L) throw new BadRequestException("sizeBytes must be >= 0") + path -> file.sizeBytes + } + .toMap + + val dataset = getDatasetByID(ctx, did) + val committed = getLatestDatasetVersion(ctx, did) + .map { v => + withLakeFSErrorHandling( + s"retrieving committed files of dataset '${dataset.getName}'" + ) { + LakeFSStorageClient + .retrieveObjectsOfVersion(dataset.getRepositoryName, v.getVersionHash) + .map(obj => obj.getPath -> obj.getSizeBytes.longValue()) + } + } + .getOrElse(List.empty) + + val staged = withLakeFSErrorHandling( + s"retrieving staged files of dataset '${dataset.getName}'" + ) { + LakeFSStorageClient.retrieveUncommittedObjects(dataset.getRepositoryName) + } + .filterNot(diff => Option(diff.getType).exists(_.getValue.equalsIgnoreCase("removed"))) + .flatMap(diff => Option(diff.getSizeBytes).map(size => diff.getPath -> size.longValue())) + + val existing = (committed ++ staged).toMap + val matches = requested + .collect { case (path, size) if existing.get(path).contains(size) => path } + .toList + .sorted + + Response.ok(Map("filePaths" -> matches.asJava)).build() + } + } + @PUT @RolesAllowed(Array("REGULAR", "ADMIN")) @Path("/{did}/diff") diff --git a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala index 1730d12a0a0..d6af524a1ae 100644 --- a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala +++ b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala @@ -404,6 +404,136 @@ class DatasetResourceSpec dashboardDataset.size should be >= 0L } + "findExistingUploadFiles" should "match committed and staged files by path and size" in { + val repoName = s"existing-upload-${System.nanoTime()}" + val dataset = new Dataset + dataset.setName(repoName) + dataset.setRepositoryName(repoName) + dataset.setDescription("existing upload checks") + dataset.setOwnerUid(ownerUser.getUid) + dataset.setIsPublic(true) + dataset.setIsDownloadable(true) + datasetDao.insert(dataset) + LakeFSStorageClient.initRepo(repoName) + + val committed = "committed".getBytes(StandardCharsets.UTF_8) + LakeFSStorageClient.writeFileToRepo( + repoName, + "committed.csv", + new ByteArrayInputStream(committed) + ) + val commit = LakeFSStorageClient.createCommit(repoName, "main", "commit existing file") + val version = new DatasetVersion() + version.setDid(dataset.getDid) + version.setCreatorUid(ownerUser.getUid) + version.setName("v1") + version.setVersionHash(commit.getId) + new DatasetVersionDao(getDSLContext.configuration()).insert(version) + + val staged = "staged".getBytes(StandardCharsets.UTF_8) + LakeFSStorageClient.writeFileToRepo(repoName, "staged.csv", new ByteArrayInputStream(staged)) + + val resp = datasetResource.findExistingUploadFiles( + dataset.getDid, + DatasetResource.ExistingUploadFilesRequest( + List( + DatasetResource.ExistingUploadFile("committed.csv", committed.length), + DatasetResource.ExistingUploadFile("staged.csv", staged.length), + DatasetResource.ExistingUploadFile("wrong-size.csv", staged.length + 1), + DatasetResource.ExistingUploadFile("missing.csv", 1L) + ) + ), + sessionUser + ) + + resp.getStatus shouldEqual 200 + mapListOfStrings(entityAsScalaMap(resp)("filePaths")) should contain theSameElementsAs List( + "committed.csv", + "staged.csv" + ) + } + + it should "treat a missing files list as empty" in { + val repoName = s"existing-upload-empty-${System.nanoTime()}" + val dataset = new Dataset + dataset.setName(repoName) + dataset.setRepositoryName(repoName) + dataset.setDescription("existing upload empty request check") + dataset.setOwnerUid(ownerUser.getUid) + dataset.setIsPublic(true) + dataset.setIsDownloadable(true) + datasetDao.insert(dataset) + LakeFSStorageClient.initRepo(repoName) + + val resp = datasetResource.findExistingUploadFiles( + dataset.getDid, + DatasetResource.ExistingUploadFilesRequest(null), + sessionUser + ) + + resp.getStatus shouldEqual 200 + mapListOfStrings(entityAsScalaMap(resp)("filePaths")) shouldBe empty + } + + it should "reject negative file sizes" in { + val ex = intercept[BadRequestException] { + datasetResource.findExistingUploadFiles( + baseDataset.getDid, + DatasetResource.ExistingUploadFilesRequest( + List(DatasetResource.ExistingUploadFile("bad-size.csv", -1L)) + ), + sessionUser + ) + } + + ex.getMessage should include("sizeBytes") + } + + it should "reject users without write access" in { + val ex = intercept[ForbiddenException] { + datasetResource.findExistingUploadFiles( + multipartDataset.getDid, + DatasetResource.ExistingUploadFilesRequest( + List(DatasetResource.ExistingUploadFile("private.csv", 1L)) + ), + multipartNoWriteSessionUser + ) + } + + assertStatus(ex, 403) + } + + it should "surface a LakeFS 404 as NotFoundException when checking a missing repo" in { + val repoName = s"existing-upload-missing-repo-${System.nanoTime()}" + val dataset = new Dataset + dataset.setName(repoName) + dataset.setRepositoryName(repoName) + dataset.setDescription("existing upload missing repo check") + dataset.setOwnerUid(ownerUser.getUid) + dataset.setIsPublic(true) + dataset.setIsDownloadable(true) + datasetDao.insert(dataset) + + val version = new DatasetVersion() + version.setDid(dataset.getDid) + version.setCreatorUid(ownerUser.getUid) + version.setName("v1") + version.setVersionHash("missing-version") + new DatasetVersionDao(getDSLContext.configuration()).insert(version) + + val ex = intercept[NotFoundException] { + datasetResource.findExistingUploadFiles( + dataset.getDid, + DatasetResource.ExistingUploadFilesRequest( + List(DatasetResource.ExistingUploadFile("missing.csv", 1L)) + ), + sessionUser + ) + } + + assertStatus(ex, 404) + } + it should "surface a LakeFS 404 as NotFoundException when the dataset repo is missing" in { val dataset = new Dataset dataset.setName("get-ds-no-repo") diff --git a/frontend/src/app/dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component.html b/frontend/src/app/dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component.html index 1d5f8b849ab..f2df36891ad 100644 --- a/frontend/src/app/dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component.html +++ b/frontend/src/app/dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component.html @@ -29,7 +29,7 @@