Skip to content

feat: Add centralized rate limiter#305

Open
vanitabhagwat wants to merge 28 commits intomasterfrom
ratelimiter
Open

feat: Add centralized rate limiter#305
vanitabhagwat wants to merge 28 commits intomasterfrom
ratelimiter

Conversation

@vanitabhagwat
Copy link
Collaborator

@vanitabhagwat vanitabhagwat commented Oct 17, 2025

This PR is to create a more generic solution for write rate limiting that can be utilized by all online stores.

@vanitabhagwat vanitabhagwat changed the title add centralized rate limiter feat: add centralized rate limiter Oct 17, 2025
@vanitabhagwat vanitabhagwat changed the title feat: add centralized rate limiter feat: Add centralized rate limiter Oct 17, 2025
Comment on lines 300 to 308
# Leaving one core for operating system and other background processes
num_processes = num_spark_driver_cores - 1

if table.num_rows < num_processes:
num_processes = table.num_rows

# Input table is split into smaller chunks and processed in parallel
chunks = self.split_table(num_processes, table)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you intentionally deleting the comments here? There are some comment deletions throughout.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wasn't intentional. I ran the black and ruff formatting which probably did that. Added them back.

Comment on lines 34 to 47
backoff = 0.005 # initial minimal sleep
while not self.acquire():
# Compute estimated sleep until oldest timestamp exits window.
# We use the current index position as the next candidate slot.
now = time.time()
with self._lock:
oldest_ts = self.timestamps[self.index]
remaining = oldest_ts + self.period - now
if remaining <= 0:
continue
# Sleep the smaller of remaining and a capped value to re-check frequently.
time.sleep(min(remaining, 0.05))
# Optional exponential backoff (bounded) if still not free.
backoff = min(backoff * 2, 0.05)
Copy link
Collaborator

@piket piket Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't look like backoff is actually used anywhere, it's only recalculated. Is this meant to be part of the minimum sleep instead of the hardcoded 0.05?

entities_to_keep: Sequence[Entity],
partial: bool,
):
# Call update only if there is an online store
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you recomment this as well.

Copy link
Collaborator

@piket piket left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

out, err := cmd.CombinedOutput()
if err != nil {
log.Printf("Repo init error: %s", string(out))
log.Fatal(err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.Fatal() calls os.exit(1) which will exit the test process. We need to use log.Printf()

Comment on lines 221 to 222
percent_usage = 0.6
interval = 1.0 # seconds
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

percent_usage -> Can we define this based on number of processors?
interval -> Good. We set is based on previous logic so no confusion here.

Copy link
Collaborator

@piket piket left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

vbhagwat and others added 4 commits February 23, 2026 13:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants