Update operator_engine/pr_actions/action_queue.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
Alexa Amundson
2025-11-17 23:37:18 -06:00
committed by GitHub
parent e1c18249b8
commit 754ad0127e

View File

@@ -95,56 +95,58 @@ class PRActionQueue:
Returns: Returns:
action_id: Unique identifier for the action action_id: Unique identifier for the action
""" """
# Use default priority if not specified async with self._lock:
if priority is None: # Use default priority if not specified
priority = get_default_priority(action_type) if priority is None:
priority = get_default_priority(action_type)
# Create action # Create action
action = PRAction( action = PRAction(
action_id=str(uuid.uuid4()), action_id=str(uuid.uuid4()),
action_type=action_type, action_type=action_type,
repo_owner=repo_owner, repo_owner=repo_owner,
repo_name=repo_name, repo_name=repo_name,
pr_number=pr_number, pr_number=pr_number,
params=params, params=params,
priority=priority, priority=priority,
status=PRActionStatus.QUEUED, status=PRActionStatus.QUEUED,
created_at=datetime.utcnow(), created_at=datetime.utcnow(),
updated_at=datetime.utcnow(), updated_at=datetime.utcnow(),
triggered_by=triggered_by, triggered_by=triggered_by,
)
# Check for duplicates
duplicate_id = self._find_duplicate(action)
if duplicate_id:
logger.info(
f"Duplicate action found: {duplicate_id}. "
f"Skipping enqueue for {action.action_id}"
) )
return duplicate_id
# Add to queue # Check for duplicates
self._queue[action.action_id] = action duplicate_id = await self._find_duplicate(action)
if duplicate_id:
logger.info(
f"Duplicate action found: {duplicate_id}. "
f"Skipping enqueue for {action.action_id}"
)
return duplicate_id
logger.info( # Add to queue
f"Enqueued {action_type.value} for {repo_owner}/{repo_name}#{pr_number} " self._queue[action.action_id] = action
f"(priority: {priority.value}, id: {action.action_id})"
)
return action.action_id logger.info(
f"Enqueued {action_type.value} for {repo_owner}/{repo_name}#{pr_number} "
f"(priority: {priority.value}, id: {action.action_id})"
)
def _find_duplicate(self, action: PRAction) -> Optional[str]: return action.action_id
async def _find_duplicate(self, action: PRAction) -> Optional[str]:
"""Check if an identical action is already queued or processing""" """Check if an identical action is already queued or processing"""
for existing_id, existing in {**self._queue, **self._processing}.items(): async with self._lock:
if ( for existing_id, existing in {**self._queue, **self._processing}.items():
existing.action_type == action.action_type if (
and existing.repo_owner == action.repo_owner existing.action_type == action.action_type
and existing.repo_name == action.repo_name and existing.repo_owner == action.repo_owner
and existing.pr_number == action.pr_number and existing.repo_name == action.repo_name
and existing.params == action.params and existing.pr_number == action.pr_number
): and existing.params == action.params
return existing_id ):
return None return existing_id
return None
async def _worker(self, worker_id: int): async def _worker(self, worker_id: int):
"""Worker that processes actions from the queue""" """Worker that processes actions from the queue"""