diff --git a/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala b/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala index 0399e386ba7..27a3b7be7c1 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala @@ -32,7 +32,8 @@ import org.apache.texera.amber.config.PythonUtils * for each Computing Unit * * It supports: - * - Creating and initializing isolated Python environments + * - Creating and initializing isolated Python environments (with system packages) + * - Installing user defined packages * - Streaming pip output logs back to the caller * * Each PVE is stored under: @@ -41,6 +42,11 @@ import org.apache.texera.amber.config.PythonUtils object PveManager { + case class PvePackageResponse( + pveName: String, + userPackages: Seq[String] + ) + private val VenvRoot: Path = Paths.get("/tmp/texera-pve/venvs") private def cuidDir(cuid: Int, pveName: String): Path = { @@ -61,9 +67,63 @@ object PveManager { "PIP_NO_INPUT" -> "1" ) - def getSystemPackages(): Seq[String] = { - val python = PythonUtils.getPythonExecutable - Process(Seq(python, "-m", "pip", "freeze")).!!.split("\n").map(_.trim).filter(_.nonEmpty).toSeq + private def readPackageFile(path: Path): Seq[String] = { + if (Files.exists(path)) { + Files + .readAllLines(path) + .asScala + .map(_.trim) + .filter(_.nonEmpty) + .toSeq + } else { + Seq() + } + } + + private def getSystemPath(isLocal: Boolean): Path = { + Paths.get( + if (isLocal) "amber/system-requirements-lock.txt" + else "/tmp/system-requirements-lock.txt" + ) + } + + def getSystemPackages(isLocal: Boolean): Seq[String] = { + if (!Files.exists(getSystemPath(isLocal))) { + Seq() + } else { + Files + .readAllLines(getSystemPath(isLocal)) + .asScala + .map(_.trim) + .filter(line => line.nonEmpty && !line.startsWith("#")) + .toSeq + } + } + + private def runPipInstall( + python: String, + args: Seq[String], + queue: BlockingQueue[String] + ): Int = { + Process( + Seq( + python, + "-u", + "-m", + "pip", + "install", + "--progress-bar", + "off", + "--no-input" + ) ++ args, + None, + pipEnv.toSeq: _* + ).!( + ProcessLogger( + out => queue.put(s"[pip] $out"), + err => queue.put(s"[pip][ERR] $err") + ) + ) } /** @@ -85,23 +145,18 @@ object PveManager { queue.put(s"[PVE] Creating new PVE for cuid: $cuid with name: $pveName") // NOTE: These paths are derived from computing-unit-master.dockerfile. - // If requirements.txt or operator-requirements.txt locations change, update these paths. + // If requirements.txt location changes, update these paths. val requirementsPath = if (isLocal) Paths.get("amber", "requirements.txt") else Paths.get("/tmp", "requirements.txt") - val operatorRequirementsPath = - if (isLocal) Paths.get("amber", "operator-requirements.txt") - else Paths.get("/tmp", "operator-requirements.txt") - - if (!Files.exists(requirementsPath) || !Files.exists(operatorRequirementsPath)) { + if (!Files.exists(requirementsPath)) { queue.put(s"[PVE][ERR] System requirements not found") return } val venvDirPath = pveDir(cuid, pveName).toAbsolutePath val python = pythonBinPath(cuid, pveName).toAbsolutePath.toString - val envVars = pipEnv val createVenvPython = PythonUtils.getPythonExecutable @@ -121,43 +176,17 @@ object PveManager { return } - if (!Files.exists(requirementsPath)) { - queue.put(s"[PVE][ERR] requirements.txt not found at ${requirementsPath.toAbsolutePath}") - return - } - - if (!Files.exists(operatorRequirementsPath)) { - queue.put( - s"[PVE][ERR] operator-requirements.txt not found at ${operatorRequirementsPath.toAbsolutePath}" - ) - return - } - queue.put( - s"[PVE] Installing requirements from ${requirementsPath.toAbsolutePath} and ${operatorRequirementsPath.toAbsolutePath}" + s"[PVE] Installing requirements from ${requirementsPath.toAbsolutePath}" ) - val installReqCode = Process( + val installReqCode = runPipInstall( + python, Seq( - python, - "-u", - "-m", - "pip", - "install", - "--progress-bar", - "off", "-r", - requirementsPath.toString, - "-r", - operatorRequirementsPath.toString + requirementsPath.toString ), - None, - envVars.toSeq: _* - ).!( - ProcessLogger( - out => queue.put(s"[pip] $out"), - err => queue.put(s"[pip][ERR] $err") - ) + queue ) queue.put(s"[PVE] requirements install finished with exit code $installReqCode") @@ -170,7 +199,8 @@ object PveManager { queue.put(s"[PVE] Created new environment for cuid = $cuid") } - def getEnvironments(cuid: Int): List[String] = { + // returns list of PVE names and corresponding user packages for a given CU + def getEnvironments(cuid: Int): List[PvePackageResponse] = { val cuPath = VenvRoot.resolve(cuid.toString) @@ -185,7 +215,17 @@ object PveManager { .iterator() .asScala .filter(path => Files.isDirectory(path)) - .map(path => path.getFileName.toString) + .map { path => + val pveName = path.getFileName.toString + val metadataPath = path.resolve("user-packages.txt") + + val userPackages = readPackageFile(metadataPath) + + PvePackageResponse( + pveName = pveName, + userPackages = userPackages + ) + } .toList } finally { stream.close() @@ -212,4 +252,93 @@ object PveManager { stream.close() } } + + /** + * Installs user requested Python packages into the PVE. + * + * 1. Executes pip install for each package + * 2. Prevents conflicts with system dependencies. + * 3. Updates user metadata file + * 4. Streams logs back via queue + */ + def installUserPackages( + packages: List[String], + cuid: Int, + queue: BlockingQueue[String], + pveName: String, + isLocal: Boolean + ): Unit = { + + val python = pythonBinPath(cuid, pveName).toAbsolutePath.toString + + if (!Files.exists(Paths.get(python))) { + queue.put(s"[PVE][ERR] Python executable not found for PVE: $python") + return + } + + val metadataPath = cuidDir(cuid, pveName).resolve("user-packages.txt") + + var installedPackages = readPackageFile(metadataPath).toSet + + val systemPackages = + if (Files.exists(getSystemPath(isLocal))) { + Files + .readAllLines(getSystemPath(isLocal)) + .asScala + .map(_.trim) + .filter(line => line.nonEmpty && !line.startsWith("#")) + .map(line => line.split("==")(0).trim.toLowerCase) + .toSet + } else { + Set[String]() + } + + packages.foreach { pkg => + val trimmedPkg = pkg.trim + + if (trimmedPkg.nonEmpty) { + + val userPackageName = trimmedPkg.split("==")(0).trim.toLowerCase + + if (systemPackages.contains(userPackageName)) { + queue.put( + s"[PVE][ERR] $trimmedPkg is a system package and cannot be installed or modified by the user." + ) + return + } + + queue.put(s"[PVE] Installing package: $trimmedPkg") + + val code = runPipInstall( + python, + Seq( + "--constraint", // check against system-requirements-lock + getSystemPath(isLocal).toString, + trimmedPkg + ), + queue + ) + + queue.put(s"[pip] install($trimmedPkg) finished with exit code $code") + + if (code != 0) { + queue.put(s"[PVE][ERR] Failed to install package: $trimmedPkg") + return + } + + installedPackages = installedPackages + trimmedPkg + + Files.write( + metadataPath, + installedPackages.toSeq.sorted.asJava + ) + } + } + + queue.put("[PVE] Final user package list:") + + installedPackages.toSeq.sorted.foreach { pkg => + queue.put(s"[user-package] $pkg") + } + } } diff --git a/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveResource.scala index 1040fd64ea4..8a6f4875293 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveResource.scala @@ -28,24 +28,32 @@ import java.util @Consumes(Array(MediaType.APPLICATION_JSON)) class PveResource { // -------------------------------------------------- - // Get installed packages + // Get system packages // -------------------------------------------------- @GET @Path("/system") @Produces(Array(MediaType.APPLICATION_JSON)) def getSystemPackages: util.Map[String, util.List[String]] = { try { - val systemPkgs = PveManager.getSystemPackages().toList.asJava + + // TODO: Support Kubernetes environment handling + val isLocal = true + + val systemPkgs = + PveManager.getSystemPackages(isLocal).toList.asJava + Map("system" -> systemPkgs).asJava } catch { case e: Exception => e.printStackTrace() - throw new InternalServerErrorException("Failed to get system packages.") + throw new InternalServerErrorException( + "Failed to get system packages." + ) } } // -------------------------------------------------- - // Fetch PVEs + // Fetch PVEs and Installed User Packages // -------------------------------------------------- @GET @Path("/pves") @@ -54,9 +62,10 @@ class PveResource { try { PveManager .getEnvironments(cuid) - .map { pveName => + .map { pve => Map( - "pveName" -> pveName.asInstanceOf[Object] + "pveName" -> pve.pveName.asInstanceOf[Object], + "userPackages" -> pve.userPackages.asJava.asInstanceOf[Object] ).asJava } .asJava diff --git a/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveWebsocketResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveWebsocketResource.scala index b93d1bfde03..e21f91fada3 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveWebsocketResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveWebsocketResource.scala @@ -26,9 +26,9 @@ import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global /** - * WebSocket endpoint for PVE creation that streams pip installation logs - * to the frontend in real time. The environment setup runs asynchronously, - * and output is pushed to the client until completion. + * WebSocket endpoint for PVE creation and user package installation that streams + * pip installation logs to the frontend in real time. The environment setup runs + * asynchronously, and output is pushed to the client until completion. */ @ServerEndpoint("/wsapi/pve") @@ -42,12 +42,33 @@ class PveWebsocketResource { val cuid = params.get("cuid").get(0).toInt val pveName = params.get("pveName").get(0) val isLocal = params.get("isLocal").get(0).toBoolean + val action = params.getOrDefault("action", java.util.List.of("create")).get(0) val queue = new LinkedBlockingQueue[String]() Future { try { - PveManager.createNewPve(cuid, queue, pveName, isLocal) + action match { + case "create" => + PveManager.createNewPve(cuid, queue, pveName, isLocal) + + case "install" => + val packages = + params + .getOrDefault("packages", java.util.List.of("[]")) + .get(0) + .stripPrefix("[") + .stripSuffix("]") + .split(",") + .toList + .map(_.replace("\"", "").trim) + .filter(_.nonEmpty) + + PveManager.installUserPackages(packages, cuid, queue, pveName, isLocal) + + case _ => + queue.put(s"[ERR] Unknown action: $action") + } } catch { case e: Exception => queue.put(s"[ERR] ${e.getMessage}") @@ -61,7 +82,6 @@ class PveWebsocketResource { while (!done && session.isOpen) { val line = queue.take() - session.getBasicRemote.sendText(line) if (line == "__DONE__") { diff --git a/amber/src/test/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveResourceSpec.scala b/amber/src/test/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveResourceSpec.scala index a093cf1ad2f..10e952c8bdc 100644 --- a/amber/src/test/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveResourceSpec.scala +++ b/amber/src/test/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveResourceSpec.scala @@ -64,7 +64,38 @@ class PveResourceSpec extends AnyFlatSpec with Matchers with BeforeAndAfterEach Files.exists(pythonPath) shouldBe true Files.exists(pipPath) shouldBe true - PveManager.getEnvironments(testCuid) should contain(testPveName) + PveManager.getEnvironments(testCuid).map(_.pveName) should contain(testPveName) + } + + "PveManager" should "install a user package and list it for the PVE" in { + PveManager.createNewPve(testCuid, queue, testPveName, isLocal = true) + + val packageName = "colorama" + val packageVersion = "0.4.6" + val packageSpec = s"$packageName==$packageVersion" + + queue.clear() + + PveManager.installUserPackages( + List(packageSpec), + testCuid, + queue, + testPveName, + isLocal = true + ) + + val logs = queueText() + + logs should not include "[PVE][ERR]" + logs should include(s"[PVE] Installing package: $packageSpec") + logs should include(s"[user-package] $packageSpec") + + val pve = PveManager + .getEnvironments(testCuid) + .find(_.pveName == testPveName) + + pve should not be empty + pve.get.userPackages should contain(packageSpec) } "PveManager" should "delete all PVEs for a computing unit" in { diff --git a/amber/system-requirements-lock.txt b/amber/system-requirements-lock.txt new file mode 100644 index 00000000000..67ea725c88c --- /dev/null +++ b/amber/system-requirements-lock.txt @@ -0,0 +1,102 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# This file is manually generated to track system packages used in PVEs. +# NOTE: This file must be updated whenever requirements.txt or +# operator-requirements.txt changes. + +aiohappyeyeballs==2.6.1 +aiohttp==3.13.5 +aioitertools==0.13.0 +aiobotocore==2.25.1 +aiosignal==1.4.0 +annotated-types==0.7.0 +appdirs==1.4.4 +asn1crypto==1.5.1 +attrs==26.1.0 +betterproto==2.0.0b7 +bidict==0.22.0 +boto3==1.40.53 +botocore==1.40.53 +cached_property==1.5.2 +cachetools==6.2.6 +certifi==2026.4.22 +charset_normalizer==3.4.7 +click==8.3.3 +Deprecated==1.2.14 +frozenlist==1.8.0 +fs==2.4.16 +fsspec==2025.9.0 +grpclib==0.4.9 +h2==4.3.0 +hpack==4.1.0 +hyperframe==6.1.0 +idna==3.14 +iniconfig==1.1.1 +jmespath==1.1.0 +loguru==0.7.0 +markdown-it-py==4.2.0 +mdurl==0.1.2 +mmh3==5.2.1 +multidict==6.7.1 +numpy==2.1.0 +overrides==7.4.0 +packaging==26.2 +pampy==0.3.0 +pandas==2.2.3 +pg8000==1.31.5 +pluggy==1.6.0 +praw==7.6.1 +prawcore==2.4.0 +propcache==0.5.2 +protobuf==7.34.1 +psutil==5.9.0 +pyarrow==21.0.0 +pydantic==2.13.4 +pydantic-core==2.46.4 +pygments==2.20.0 +pyiceberg==0.11.1 +pympler==1.1 +pyparsing==3.3.2 +pyroaring==1.1.0 +pytest==7.4.0 +pytest-reraise==2.1.2 +pytest-timeout==2.2.0 +python-dateutil==2.8.2 +pytz==2026.2 +readerwriterlock==1.0.9 +requests==2.34.0 +rich==14.3.4 +ruff==0.14.7 +s3fs==2025.9.0 +s3transfer==0.14.0 +scramp==1.4.8 +setuptools==80.10.2 +six==1.17.0 +SQLAlchemy==2.0.37 +strictyaml==1.7.3 +tenacity==8.5.0 +typing-inspection==0.4.2 +typing_extensions==4.14.1 +tzdata==2026.2 +tzlocal==2.1 +update-checker==0.18.0 +urllib3==2.7.0 +websocket-client==1.9.0 +wrapt==1.17.3 +yarl==1.23.0 +zstandard==0.25.0 \ No newline at end of file diff --git a/frontend/src/app/workspace/component/power-button/computing-unit-selection.component.html b/frontend/src/app/workspace/component/power-button/computing-unit-selection.component.html index b742c71581c..6f16073073f 100644 --- a/frontend/src/app/workspace/component/power-button/computing-unit-selection.component.html +++ b/frontend/src/app/workspace/component/power-button/computing-unit-selection.component.html @@ -480,7 +480,7 @@
-
+
+ + +
+
+
+ +
+ +
+ +
+ +
+ +
+
+
+ + +
+
+
Package
+ +
Version
+
+ +
+
+
+ +
+ +
+ + + + + +
+ +
+ +
+
+
+
+ +
+ +
+