|
8 | 8 | from crowdgit.errors import RepoLockingError |
9 | 9 | from crowdgit.models.repository import Repository |
10 | 10 | from crowdgit.models.service_execution import ServiceExecution |
11 | | -from crowdgit.settings import MAX_CONCURRENT_ONBOARDINGS, REPOSITORY_UPDATE_INTERVAL_HOURS |
| 11 | +from crowdgit.settings import ( |
| 12 | + MAX_CONCURRENT_ONBOARDINGS, |
| 13 | + MAX_INTEGRATION_RESULTS, |
| 14 | + REPOSITORY_UPDATE_INTERVAL_HOURS, |
| 15 | +) |
12 | 16 |
|
13 | 17 | from .connection import get_db_connection |
14 | 18 | from .registry import execute, executemany, fetchrow, fetchval, query |
@@ -143,13 +147,43 @@ async def acquire_recurrent_repo() -> Repository | None: |
143 | 147 | ) |
144 | 148 |
|
145 | 149 |
|
| 150 | +async def can_onboard_more(): |
| 151 | + """ |
| 152 | + Check if system can handle more repository onboarding based on activity load. |
| 153 | +
|
| 154 | + Returns False if integration.results count exceeds MAX_INTEGRATION_RESULTS |
| 155 | + or if the query fails (indicating high database load). |
| 156 | + """ |
| 157 | + try: |
| 158 | + integration_results_count = await fetchval("SELECT COUNT(*) FROM integration.results") |
| 159 | + return integration_results_count < MAX_INTEGRATION_RESULTS |
| 160 | + except Exception as e: |
| 161 | + logger.warning(f"Failed to get integration.results count with error: {repr(e)}") |
| 162 | + return False # if query failed mostly due to timeout then db is already under high load |
| 163 | + |
| 164 | + |
146 | 165 | async def acquire_repo_for_processing() -> Repository | None: |
147 | | - # prioritizing onboarding repositories |
148 | | - # TODO: document priority logic and values(0, 1, 2) |
149 | | - repo_to_process = await acquire_onboarding_repo() |
| 166 | + """ |
| 167 | + Acquire the next repository to process based on priority and system load. |
| 168 | +
|
| 169 | + Priority logic: |
| 170 | + 1. Onboarding repos (PENDING state) - only if system load allows and |
| 171 | + current onboarding count is below MAX_CONCURRENT_ONBOARDINGS |
| 172 | + 2. Recurrent repos (non-PENDING/non-PROCESSING) - fallback when onboarding |
| 173 | + is unavailable or skipped due to high load |
| 174 | +
|
| 175 | + Onboarding is delayed when integration.results exceeds MAX_INTEGRATION_RESULTS |
| 176 | + to prevent overloading the system during high activity periods. |
| 177 | + """ |
| 178 | + repo_to_process = None |
| 179 | + if await can_onboard_more(): |
| 180 | + repo_to_process = await acquire_onboarding_repo() |
| 181 | + else: |
| 182 | + logger.info("Skipping onboarding due to high load on integration.results") |
| 183 | + |
150 | 184 | if not repo_to_process: |
151 | | - # Fallback to non-onboarding repos |
152 | 185 | repo_to_process = await acquire_recurrent_repo() |
| 186 | + |
153 | 187 | return repo_to_process |
154 | 188 |
|
155 | 189 |
|
|
0 commit comments