fix: Pre-create S3A event log dir before SparkContext init#6317
fix: Pre-create S3A event log dir before SparkContext init#6317ntkathole merged 5 commits intofeast-dev:masterfrom
Conversation
c8351c5 to
448212d
Compare
R-behera
left a comment
There was a problem hiding this comment.
This looks like a useful guard for the S3A event log edge case, and the focused tests help. One follow-up worth considering is whether some Feast users rely on credentials or endpoint details only through Spark/Hadoop config rather than environment variables. If so, a short note or test around that path could prevent surprises when the pre-create step runs before Spark fully applies the config.
| "spark.hadoop.fs.s3a.endpoint", | ||
| os.environ.get("FEAST_S3A_ENDPOINT", ""), | ||
| ) | ||
| access_key = os.environ.get("AWS_ACCESS_KEY_ID", "") |
There was a problem hiding this comment.
access_key = spark_config.get(
"spark.hadoop.fs.s3a.access.key",
os.environ.get("AWS_ACCESS_KEY_ID", ""),
)
secret_key = spark_config.get(
"spark.hadoop.fs.s3a.secret.key",
os.environ.get("AWS_SECRET_ACCESS_KEY", ""),
)
session_token = spark_config.get(
"spark.hadoop.fs.s3a.session.token",
os.environ.get("AWS_SESSION_TOKEN", ""),
) or None
|
@abhijeet-dhumal Let's handle both comment from devin and @R-behera suggestion |
b60d47c to
19bdd11
Compare
@ntkathole Addressed both your comments ✅ |
@R-behera Good catch on the Spark/Hadoop config credentials path ✅ |
|
|
||
| endpoint = spark_config.get( | ||
| "spark.hadoop.fs.s3a.endpoint", | ||
| os.environ.get("FEAST_S3A_ENDPOINT", ""), |
There was a problem hiding this comment.
Wondering if this can be AWS_ENDPOINT_URL instead or atleast we need to document this new env var in our docs ?
There was a problem hiding this comment.
Good call — switched to AWS_ENDPOINT_URL . No custom env vars to document now. Spark config (spark.hadoop.fs.s3a.endpoint) still takes precedence when set.
|
@abhijeet-dhumal let's fix the linting |
| aws_access_key_id=access_key or None, | ||
| aws_secret_access_key=secret_key or None, | ||
| aws_session_token=session_token, | ||
| config=BotoConfig(signature_version="s3v4"), |
There was a problem hiding this comment.
Also, consider supporting minio or other path style
addressing_style = (
"path"
if spark_config.get("spark.hadoop.fs.s3a.path.style.access", "false").lower() == "true"
else "auto"
)
config=BotoConfig(
signature_version="s3v4",
s3={"addressing_style": addressing_style},
)
There was a problem hiding this comment.
Added .. - _ensure_s3a_event_log_dir now reads spark.hadoop.fs.s3a.path.style.access and passes addressing_style: "path" to BotoConfig when it's "true", otherwise defaults to "auto". Tests cover both paths
…prevent silent materialize failure Spark's EventLogFileWriter.requireLogBaseDirAsDirectory() is called inside SparkContext.__init__. When spark.eventLog.dir points to an S3A path that doesn't exist yet (S3 has no real directories), SparkContext fails to initialise — silently from Feast's perspective because _materialize_one() catches the exception and returns an ERROR job. Add _ensure_s3a_event_log_dir() to utils.py: before building the SparkSession, check if the S3A prefix exists and write a zero-byte placeholder if it doesn't. Uses boto3 (already a Feast dep via S3 offline store). Non-fatal: logs a warning and lets Spark surface its own error if the write fails. Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
… config, add session token support Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
…linting Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
22b7e8e to
70215e2
Compare
What this PR does / why we need it:
When spark.eventLog.enabled: "true" and spark.eventLog.dir points to an S3A path, feast materialize-incremental silently writes nothing to the online store and exits with code 0.
The failure chain:
S3 has no real directories. An empty prefix is indistinguishable from "does not exist", so Spark's pre-flight check always fails on a fresh bucket.
Which issue(s) this PR fixes:
In get_or_create_new_spark_session() (compute_engines/spark/utils.py), before building the SparkSession, call _ensure_s3a_event_log_dir() which:
No-ops for non-S3A paths (hdfs://, file://, etc.) and when event logging is disabled.
Checks
git commit -s)Testing Strategy
Misc