Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
5c89658
Files added
SarahAsad23 May 4, 2026
c1a9574
test added
SarahAsad23 May 4, 2026
04c9c2c
Merge branch 'main' into pve-add-user-packages
SarahAsad23 May 4, 2026
6d11ab4
Merge branch 'main' into pve-add-user-packages
SarahAsad23 May 4, 2026
60257bb
fix comment
SarahAsad23 May 4, 2026
2845f67
remove redundancy
SarahAsad23 May 4, 2026
315b1f7
rename PackageRow
SarahAsad23 May 4, 2026
a65ef35
rename pveWebsocket
SarahAsad23 May 4, 2026
3050238
reduce user package row spacing
SarahAsad23 May 4, 2026
43afe3c
disable installing duplicate and system packages
SarahAsad23 May 4, 2026
f4ed6cc
added helper
SarahAsad23 May 4, 2026
6e78110
code refactor
SarahAsad23 May 4, 2026
e1068a0
formatting
SarahAsad23 May 4, 2026
c948a95
Merge branch 'pve-add-user-packages' of https://github.com/SarahAsad2…
SarahAsad23 May 4, 2026
9a977ad
cleanup
SarahAsad23 May 4, 2026
722cc2b
Merge branch 'main' into pve-add-user-packages
SarahAsad23 May 4, 2026
c51f962
revert
SarahAsad23 May 4, 2026
a3a2be8
Merge branch 'main' into pve-add-user-packages
SarahAsad23 May 5, 2026
6ed49f1
Merge branch 'main' into pve-add-user-packages
SarahAsad23 May 5, 2026
41e9642
rename operator
SarahAsad23 May 5, 2026
acd7f71
Merge branch 'main' into pve-add-user-packages
SarahAsad23 May 8, 2026
47c0859
Merge branch 'pve-add-user-packages' of https://github.com/SarahAsad2…
SarahAsad23 May 8, 2026
f915509
remove redundant headers
SarahAsad23 May 8, 2026
b1b1639
require op and version
SarahAsad23 May 8, 2026
2a7db5d
Merge branch 'main' into pve-add-user-packages
SarahAsad23 May 8, 2026
a71c4d4
update comment
SarahAsad23 May 8, 2026
a09b091
Added back op reqs
SarahAsad23 May 8, 2026
ef49f59
Added back op reqs
SarahAsad23 May 8, 2026
6ea9ca2
protect system packages
SarahAsad23 May 11, 2026
278fadd
Merge branch 'main' into pve-add-user-packages
SarahAsad23 May 11, 2026
309cd48
minor changes
SarahAsad23 May 11, 2026
4b0efce
Merge branch 'pve-add-user-packages' of https://github.com/SarahAsad2…
SarahAsad23 May 11, 2026
022e9ef
add header
SarahAsad23 May 11, 2026
0a26f6d
minor changes
SarahAsad23 May 12, 2026
6b1a1d5
minor changes
SarahAsad23 May 12, 2026
e5546cd
Merge branch 'main' into pve-add-user-packages
SarahAsad23 May 12, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import org.apache.texera.amber.config.PythonUtils
* for each Computing Unit
*
* It supports:
* - Creating and initializing isolated Python environments
* - Creating and initializing isolated Python environments (with system packages)
* - Installing user defined packages
* - Streaming pip output logs back to the caller
*
* Each PVE is stored under:
Expand All @@ -41,6 +42,11 @@ import org.apache.texera.amber.config.PythonUtils

object PveManager {

case class PvePackageResponse(
pveName: String,
userPackages: Seq[String]
)

private val VenvRoot: Path = Paths.get("/tmp/texera-pve/venvs")

private def cuidDir(cuid: Int, pveName: String): Path = {
Expand All @@ -61,9 +67,63 @@ object PveManager {
"PIP_NO_INPUT" -> "1"
)

def getSystemPackages(): Seq[String] = {
val python = PythonUtils.getPythonExecutable
Process(Seq(python, "-m", "pip", "freeze")).!!.split("\n").map(_.trim).filter(_.nonEmpty).toSeq
private def readPackageFile(path: Path): Seq[String] = {
if (Files.exists(path)) {
Files
.readAllLines(path)
.asScala
.map(_.trim)
.filter(_.nonEmpty)
.toSeq
} else {
Seq()
}
}

private def getSystemPath(isLocal: Boolean): Path = {
Paths.get(
if (isLocal) "amber/system-requirements-lock.txt"
else "/tmp/system-requirements-lock.txt"
)
}

def getSystemPackages(isLocal: Boolean): Seq[String] = {
if (!Files.exists(getSystemPath(isLocal))) {
Seq()
} else {
Files
.readAllLines(getSystemPath(isLocal))
.asScala
.map(_.trim)
.filter(line => line.nonEmpty && !line.startsWith("#"))
.toSeq
}
}

private def runPipInstall(
python: String,
args: Seq[String],
queue: BlockingQueue[String]
): Int = {
Process(
Seq(
python,
"-u",
"-m",
"pip",
"install",
"--progress-bar",
"off",
"--no-input"
) ++ args,
None,
pipEnv.toSeq: _*
).!(
ProcessLogger(
out => queue.put(s"[pip] $out"),
err => queue.put(s"[pip][ERR] $err")
)
)
}

/**
Expand All @@ -85,23 +145,18 @@ object PveManager {
queue.put(s"[PVE] Creating new PVE for cuid: $cuid with name: $pveName")

// NOTE: These paths are derived from computing-unit-master.dockerfile.
// If requirements.txt or operator-requirements.txt locations change, update these paths.
// If requirements.txt location changes, update these paths.
val requirementsPath =
if (isLocal) Paths.get("amber", "requirements.txt")
else Paths.get("/tmp", "requirements.txt")

val operatorRequirementsPath =
if (isLocal) Paths.get("amber", "operator-requirements.txt")
else Paths.get("/tmp", "operator-requirements.txt")

if (!Files.exists(requirementsPath) || !Files.exists(operatorRequirementsPath)) {
if (!Files.exists(requirementsPath)) {
queue.put(s"[PVE][ERR] System requirements not found")
return
}

val venvDirPath = pveDir(cuid, pveName).toAbsolutePath
val python = pythonBinPath(cuid, pveName).toAbsolutePath.toString
val envVars = pipEnv

val createVenvPython = PythonUtils.getPythonExecutable

Expand All @@ -121,43 +176,17 @@ object PveManager {
return
}

if (!Files.exists(requirementsPath)) {
queue.put(s"[PVE][ERR] requirements.txt not found at ${requirementsPath.toAbsolutePath}")
return
}

if (!Files.exists(operatorRequirementsPath)) {
queue.put(
s"[PVE][ERR] operator-requirements.txt not found at ${operatorRequirementsPath.toAbsolutePath}"
)
return
}

Comment thread
kunwp1 marked this conversation as resolved.
queue.put(
s"[PVE] Installing requirements from ${requirementsPath.toAbsolutePath} and ${operatorRequirementsPath.toAbsolutePath}"
s"[PVE] Installing requirements from ${requirementsPath.toAbsolutePath}"
)

val installReqCode = Process(
val installReqCode = runPipInstall(
python,
Seq(
python,
"-u",
"-m",
"pip",
"install",
"--progress-bar",
"off",
"-r",
requirementsPath.toString,
"-r",
operatorRequirementsPath.toString
requirementsPath.toString
),
None,
envVars.toSeq: _*
).!(
ProcessLogger(
out => queue.put(s"[pip] $out"),
err => queue.put(s"[pip][ERR] $err")
)
queue
)

queue.put(s"[PVE] requirements install finished with exit code $installReqCode")
Expand All @@ -170,7 +199,8 @@ object PveManager {
queue.put(s"[PVE] Created new environment for cuid = $cuid")
}

def getEnvironments(cuid: Int): List[String] = {
// returns list of PVE names and corresponding user packages for a given CU
def getEnvironments(cuid: Int): List[PvePackageResponse] = {

val cuPath = VenvRoot.resolve(cuid.toString)

Expand All @@ -185,7 +215,17 @@ object PveManager {
.iterator()
.asScala
.filter(path => Files.isDirectory(path))
.map(path => path.getFileName.toString)
.map { path =>
val pveName = path.getFileName.toString
val metadataPath = path.resolve("user-packages.txt")

val userPackages = readPackageFile(metadataPath)

PvePackageResponse(
pveName = pveName,
userPackages = userPackages
)
}
.toList
} finally {
stream.close()
Expand All @@ -212,4 +252,93 @@ object PveManager {
stream.close()
}
}

/**
* Installs user requested Python packages into the PVE.
*
* 1. Executes pip install for each package
* 2. Prevents conflicts with system dependencies.
* 3. Updates user metadata file
* 4. Streams logs back via queue
*/
def installUserPackages(
packages: List[String],
cuid: Int,
queue: BlockingQueue[String],
pveName: String,
isLocal: Boolean
): Unit = {

val python = pythonBinPath(cuid, pveName).toAbsolutePath.toString

if (!Files.exists(Paths.get(python))) {
queue.put(s"[PVE][ERR] Python executable not found for PVE: $python")
return
}

val metadataPath = cuidDir(cuid, pveName).resolve("user-packages.txt")

var installedPackages = readPackageFile(metadataPath).toSet

val systemPackages =
if (Files.exists(getSystemPath(isLocal))) {
Files
.readAllLines(getSystemPath(isLocal))
.asScala
.map(_.trim)
.filter(line => line.nonEmpty && !line.startsWith("#"))
.map(line => line.split("==")(0).trim.toLowerCase)
.toSet
} else {
Set[String]()
}

packages.foreach { pkg =>
val trimmedPkg = pkg.trim

if (trimmedPkg.nonEmpty) {

val userPackageName = trimmedPkg.split("==")(0).trim.toLowerCase

if (systemPackages.contains(userPackageName)) {
queue.put(
s"[PVE][ERR] $trimmedPkg is a system package and cannot be installed or modified by the user."
)
return
}

queue.put(s"[PVE] Installing package: $trimmedPkg")

val code = runPipInstall(
python,
Seq(
"--constraint", // check against system-requirements-lock
getSystemPath(isLocal).toString,
trimmedPkg
),
queue
)

queue.put(s"[pip] install($trimmedPkg) finished with exit code $code")

if (code != 0) {
queue.put(s"[PVE][ERR] Failed to install package: $trimmedPkg")
return
}

installedPackages = installedPackages + trimmedPkg
Comment thread
SarahAsad23 marked this conversation as resolved.

Files.write(
metadataPath,
installedPackages.toSeq.sorted.asJava
)
}
}

queue.put("[PVE] Final user package list:")

installedPackages.toSeq.sorted.foreach { pkg =>
queue.put(s"[user-package] $pkg")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,32 @@ import java.util
@Consumes(Array(MediaType.APPLICATION_JSON))
class PveResource {
// --------------------------------------------------
// Get installed packages
// Get system packages
// --------------------------------------------------
@GET
@Path("/system")
@Produces(Array(MediaType.APPLICATION_JSON))
def getSystemPackages: util.Map[String, util.List[String]] = {
try {
val systemPkgs = PveManager.getSystemPackages().toList.asJava

// TODO: Support Kubernetes environment handling
val isLocal = true
Comment thread
SarahAsad23 marked this conversation as resolved.

val systemPkgs =
PveManager.getSystemPackages(isLocal).toList.asJava

Map("system" -> systemPkgs).asJava
} catch {
case e: Exception =>
e.printStackTrace()
throw new InternalServerErrorException("Failed to get system packages.")
throw new InternalServerErrorException(
"Failed to get system packages."
)
}
}

// --------------------------------------------------
// Fetch PVEs
// Fetch PVEs and Installed User Packages
// --------------------------------------------------
@GET
@Path("/pves")
Expand All @@ -54,9 +62,10 @@ class PveResource {
try {
PveManager
.getEnvironments(cuid)
.map { pveName =>
.map { pve =>
Map(
"pveName" -> pveName.asInstanceOf[Object]
"pveName" -> pve.pveName.asInstanceOf[Object],
"userPackages" -> pve.userPackages.asJava.asInstanceOf[Object]
).asJava
}
.asJava
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

/**
* WebSocket endpoint for PVE creation that streams pip installation logs
* to the frontend in real time. The environment setup runs asynchronously,
* and output is pushed to the client until completion.
* WebSocket endpoint for PVE creation and user package installation that streams
* pip installation logs to the frontend in real time. The environment setup runs
* asynchronously, and output is pushed to the client until completion.
*/

@ServerEndpoint("/wsapi/pve")
Expand All @@ -42,12 +42,33 @@ class PveWebsocketResource {
val cuid = params.get("cuid").get(0).toInt
val pveName = params.get("pveName").get(0)
val isLocal = params.get("isLocal").get(0).toBoolean
val action = params.getOrDefault("action", java.util.List.of("create")).get(0)

val queue = new LinkedBlockingQueue[String]()

Future {
try {
PveManager.createNewPve(cuid, queue, pveName, isLocal)
action match {
case "create" =>
PveManager.createNewPve(cuid, queue, pveName, isLocal)

case "install" =>
val packages =
params
.getOrDefault("packages", java.util.List.of("[]"))
.get(0)
.stripPrefix("[")
.stripSuffix("]")
.split(",")
.toList
.map(_.replace("\"", "").trim)
.filter(_.nonEmpty)

PveManager.installUserPackages(packages, cuid, queue, pveName, isLocal)

case _ =>
queue.put(s"[ERR] Unknown action: $action")
}
} catch {
case e: Exception =>
queue.put(s"[ERR] ${e.getMessage}")
Expand All @@ -61,7 +82,6 @@ class PveWebsocketResource {

while (!done && session.isOpen) {
val line = queue.take()

Comment thread
kunwp1 marked this conversation as resolved.
session.getBasicRemote.sendText(line)

if (line == "__DONE__") {
Expand Down
Loading
Loading