feat(skills): add github-pr-triage skill and update github-issue-triage

- Add github-pr-triage skill with conservative auto-close logic
- Update github-issue-triage ratio to 7:2:1 (unspecified-low:quick:writing)
- Add gh_fetch.py script for exhaustive GitHub pagination (issues/PRs)
- Script bundled in both skills + available standalone in uvscripts/
This commit is contained in:
YeonGyu-Kim
2026-02-02 10:47:22 +09:00
parent 08889b889a
commit 8927847336
5 changed files with 1704 additions and 6 deletions

View File

@@ -60,12 +60,12 @@ Extract from user request:
**Philosophy**: Use the cheapest agent that can do the job. Expensive agents = waste unless necessary.
### Default Ratio: `unspecified-low:8, quick:1, writing:1`
### Default Ratio: `unspecified-low:7, quick:2, writing:1`
| Category | Ratio | Use For | Cost |
|----------|-------|---------|------|
| `unspecified-low` | 80% | Standard issue analysis - read issue, fetch comments, categorize | $ |
| `quick` | 10% | Trivial issues - obvious duplicates, spam, clearly resolved | ¢ |
| `unspecified-low` | 70% | Standard issue analysis - read issue, fetch comments, categorize | $ |
| `quick` | 20% | Trivial issues - obvious duplicates, spam, clearly resolved | ¢ |
| `writing` | 10% | Report generation, response drafting, summary synthesis | $$ |
### When to Override Default Ratio
@@ -132,6 +132,32 @@ function assignAgentCategory(issues: Issue[], ratio: Record<string, number>): Ma
**YOU WILL FETCH EVERY. SINGLE. ISSUE. NO EXCEPTIONS.**
## USE THE BUNDLED SCRIPT (MANDATORY)
**Use the bundled `scripts/gh_fetch.py` script for exhaustive pagination:**
```bash
# Fetch all issues (no time filter)
./scripts/gh_fetch.py issues --state all --output json
# Fetch issues from last 48 hours
./scripts/gh_fetch.py issues --hours 48 --output json
# Fetch from specific repo
./scripts/gh_fetch.py issues --repo owner/repo --hours 48 --output json
```
The script:
- Handles pagination automatically (fetches ALL pages until empty)
- Outputs JSON that you can parse for agent distribution
- Filters by time range if `--hours` is specified
---
## FALLBACK: Manual Bash Pagination
If the Python script is unavailable, follow this manual approach:
## THE GOLDEN RULE
```
@@ -140,7 +166,7 @@ NEVER stop at first result. ALWAYS verify you got everything.
NEVER assume "that's probably all". ALWAYS check if more exist.
```
## MANDATORY PAGINATION LOOP (COPY-PASTE THIS EXACTLY)
## MANUAL PAGINATION LOOP (ONLY IF PYTHON SCRIPT UNAVAILABLE)
You MUST execute this EXACT pagination loop. DO NOT simplify. DO NOT skip iterations.
@@ -282,8 +308,8 @@ gh pr list --repo $REPO --state all --limit 500 --json number,title,state,create
```
Total issues: N
Agent categories based on ratio:
- unspecified-low: floor(N * 0.8)
- quick: floor(N * 0.1)
- unspecified-low: floor(N * 0.7)
- quick: floor(N * 0.2)
- writing: ceil(N * 0.1) # For report generation
```

View File

@@ -0,0 +1,373 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "typer>=0.12.0",
# "rich>=13.0.0",
# ]
# ///
"""
GitHub Issues/PRs Fetcher with Exhaustive Pagination.
Fetches ALL issues and/or PRs from a GitHub repository using gh CLI.
Implements proper pagination to ensure no items are missed.
Usage:
./gh_fetch.py issues # Fetch all issues
./gh_fetch.py prs # Fetch all PRs
./gh_fetch.py all # Fetch both issues and PRs
./gh_fetch.py issues --hours 48 # Issues from last 48 hours
./gh_fetch.py prs --state open # Only open PRs
./gh_fetch.py all --repo owner/repo # Specify repository
"""
import asyncio
import json
from datetime import UTC, datetime, timedelta
from enum import Enum
from typing import Annotated
import typer
from rich.console import Console
from rich.panel import Panel
from rich.progress import Progress, TaskID
from rich.table import Table
app = typer.Typer(
name="gh_fetch",
help="Fetch GitHub issues/PRs with exhaustive pagination.",
no_args_is_help=True,
)
console = Console()
BATCH_SIZE = 500 # Maximum allowed by GitHub API
class ItemState(str, Enum):
ALL = "all"
OPEN = "open"
CLOSED = "closed"
class OutputFormat(str, Enum):
JSON = "json"
TABLE = "table"
COUNT = "count"
async def run_gh_command(args: list[str]) -> tuple[str, str, int]:
"""Run gh CLI command asynchronously."""
proc = await asyncio.create_subprocess_exec(
"gh",
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
return stdout.decode(), stderr.decode(), proc.returncode or 0
async def get_current_repo() -> str:
"""Get the current repository from gh CLI."""
stdout, stderr, code = await run_gh_command(["repo", "view", "--json", "nameWithOwner", "-q", ".nameWithOwner"])
if code != 0:
console.print(f"[red]Error getting current repo: {stderr}[/red]")
raise typer.Exit(1)
return stdout.strip()
async def fetch_items_page(
repo: str,
item_type: str, # "issue" or "pr"
state: str,
limit: int,
search_filter: str = "",
) -> list[dict]:
"""Fetch a single page of issues or PRs."""
cmd = [
item_type,
"list",
"--repo",
repo,
"--state",
state,
"--limit",
str(limit),
"--json",
"number,title,state,createdAt,updatedAt,labels,author,body",
]
if search_filter:
cmd.extend(["--search", search_filter])
stdout, stderr, code = await run_gh_command(cmd)
if code != 0:
console.print(f"[red]Error fetching {item_type}s: {stderr}[/red]")
return []
try:
return json.loads(stdout) if stdout.strip() else []
except json.JSONDecodeError:
console.print(f"[red]Error parsing {item_type} response[/red]")
return []
async def fetch_all_items(
repo: str,
item_type: str,
state: str,
hours: int | None,
progress: Progress,
task_id: TaskID,
) -> list[dict]:
"""Fetch ALL items with exhaustive pagination."""
all_items: list[dict] = []
page = 1
# First fetch
progress.update(task_id, description=f"[cyan]Fetching {item_type}s page {page}...")
items = await fetch_items_page(repo, item_type, state, BATCH_SIZE)
fetched_count = len(items)
all_items.extend(items)
console.print(f"[dim]Page {page}: fetched {fetched_count} {item_type}s[/dim]")
# Continue pagination if we got exactly BATCH_SIZE (more pages exist)
while fetched_count == BATCH_SIZE:
page += 1
progress.update(task_id, description=f"[cyan]Fetching {item_type}s page {page}...")
# Use created date of last item to paginate
last_created = all_items[-1].get("createdAt", "")
if not last_created:
break
search_filter = f"created:<{last_created}"
items = await fetch_items_page(repo, item_type, state, BATCH_SIZE, search_filter)
fetched_count = len(items)
if fetched_count == 0:
break
# Deduplicate by number
existing_numbers = {item["number"] for item in all_items}
new_items = [item for item in items if item["number"] not in existing_numbers]
all_items.extend(new_items)
console.print(
f"[dim]Page {page}: fetched {fetched_count}, added {len(new_items)} new (total: {len(all_items)})[/dim]"
)
# Safety limit
if page > 20:
console.print("[yellow]Safety limit reached (20 pages)[/yellow]")
break
# Filter by time if specified
if hours is not None:
cutoff = datetime.now(UTC) - timedelta(hours=hours)
cutoff_str = cutoff.isoformat()
original_count = len(all_items)
all_items = [
item
for item in all_items
if item.get("createdAt", "") >= cutoff_str or item.get("updatedAt", "") >= cutoff_str
]
filtered_count = original_count - len(all_items)
if filtered_count > 0:
console.print(f"[dim]Filtered out {filtered_count} items older than {hours} hours[/dim]")
return all_items
def display_table(items: list[dict], item_type: str) -> None:
"""Display items in a Rich table."""
table = Table(title=f"{item_type.upper()}s ({len(items)} total)")
table.add_column("#", style="cyan", width=6)
table.add_column("Title", style="white", max_width=50)
table.add_column("State", style="green", width=8)
table.add_column("Author", style="yellow", width=15)
table.add_column("Labels", style="magenta", max_width=30)
table.add_column("Updated", style="dim", width=12)
for item in items[:50]: # Show first 50
labels = ", ".join(label.get("name", "") for label in item.get("labels", []))
updated = item.get("updatedAt", "")[:10]
author = item.get("author", {}).get("login", "unknown")
table.add_row(
str(item.get("number", "")),
(item.get("title", "")[:47] + "...") if len(item.get("title", "")) > 50 else item.get("title", ""),
item.get("state", ""),
author,
(labels[:27] + "...") if len(labels) > 30 else labels,
updated,
)
console.print(table)
if len(items) > 50:
console.print(f"[dim]... and {len(items) - 50} more items[/dim]")
@app.command()
def issues(
repo: Annotated[str | None, typer.Option("--repo", "-r", help="Repository (owner/repo)")] = None,
state: Annotated[ItemState, typer.Option("--state", "-s", help="Issue state filter")] = ItemState.ALL,
hours: Annotated[
int | None,
typer.Option("--hours", "-h", help="Only issues from last N hours (created or updated)"),
] = None,
output: Annotated[OutputFormat, typer.Option("--output", "-o", help="Output format")] = OutputFormat.TABLE,
) -> None:
"""Fetch all issues with exhaustive pagination."""
async def async_main() -> None:
target_repo = repo or await get_current_repo()
console.print(f"""
[cyan]━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/cyan]
[cyan]Repository:[/cyan] {target_repo}
[cyan]State:[/cyan] {state.value}
[cyan]Time filter:[/cyan] {f"Last {hours} hours" if hours else "All time"}
[cyan]━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/cyan]
""")
with Progress(console=console) as progress:
task: TaskID = progress.add_task("[cyan]Fetching issues...", total=None)
items = await fetch_all_items(target_repo, "issue", state.value, hours, progress, task)
progress.update(task, description="[green]Complete!", completed=100, total=100)
console.print(
Panel(
f"[green]✓ Found {len(items)} issues[/green]",
title="[green]Pagination Complete[/green]",
border_style="green",
)
)
if output == OutputFormat.JSON:
console.print(json.dumps(items, indent=2, ensure_ascii=False))
elif output == OutputFormat.TABLE:
display_table(items, "issue")
else: # COUNT
console.print(f"Total issues: {len(items)}")
asyncio.run(async_main())
@app.command()
def prs(
repo: Annotated[str | None, typer.Option("--repo", "-r", help="Repository (owner/repo)")] = None,
state: Annotated[ItemState, typer.Option("--state", "-s", help="PR state filter")] = ItemState.OPEN,
hours: Annotated[
int | None,
typer.Option("--hours", "-h", help="Only PRs from last N hours (created or updated)"),
] = None,
output: Annotated[OutputFormat, typer.Option("--output", "-o", help="Output format")] = OutputFormat.TABLE,
) -> None:
"""Fetch all PRs with exhaustive pagination."""
async def async_main() -> None:
target_repo = repo or await get_current_repo()
console.print(f"""
[cyan]━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/cyan]
[cyan]Repository:[/cyan] {target_repo}
[cyan]State:[/cyan] {state.value}
[cyan]Time filter:[/cyan] {f"Last {hours} hours" if hours else "All time"}
[cyan]━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/cyan]
""")
with Progress(console=console) as progress:
task: TaskID = progress.add_task("[cyan]Fetching PRs...", total=None)
items = await fetch_all_items(target_repo, "pr", state.value, hours, progress, task)
progress.update(task, description="[green]Complete!", completed=100, total=100)
console.print(
Panel(
f"[green]✓ Found {len(items)} PRs[/green]",
title="[green]Pagination Complete[/green]",
border_style="green",
)
)
if output == OutputFormat.JSON:
console.print(json.dumps(items, indent=2, ensure_ascii=False))
elif output == OutputFormat.TABLE:
display_table(items, "pr")
else: # COUNT
console.print(f"Total PRs: {len(items)}")
asyncio.run(async_main())
@app.command(name="all")
def fetch_all(
repo: Annotated[str | None, typer.Option("--repo", "-r", help="Repository (owner/repo)")] = None,
state: Annotated[ItemState, typer.Option("--state", "-s", help="State filter")] = ItemState.ALL,
hours: Annotated[
int | None,
typer.Option("--hours", "-h", help="Only items from last N hours (created or updated)"),
] = None,
output: Annotated[OutputFormat, typer.Option("--output", "-o", help="Output format")] = OutputFormat.TABLE,
) -> None:
"""Fetch all issues AND PRs with exhaustive pagination."""
async def async_main() -> None:
target_repo = repo or await get_current_repo()
console.print(f"""
[cyan]━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/cyan]
[cyan]Repository:[/cyan] {target_repo}
[cyan]State:[/cyan] {state.value}
[cyan]Time filter:[/cyan] {f"Last {hours} hours" if hours else "All time"}
[cyan]Fetching:[/cyan] Issues AND PRs
[cyan]━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/cyan]
""")
with Progress(console=console) as progress:
issues_task: TaskID = progress.add_task("[cyan]Fetching issues...", total=None)
prs_task: TaskID = progress.add_task("[cyan]Fetching PRs...", total=None)
# Fetch in parallel
issues_items, prs_items = await asyncio.gather(
fetch_all_items(target_repo, "issue", state.value, hours, progress, issues_task),
fetch_all_items(target_repo, "pr", state.value, hours, progress, prs_task),
)
progress.update(
issues_task,
description="[green]Issues complete!",
completed=100,
total=100,
)
progress.update(prs_task, description="[green]PRs complete!", completed=100, total=100)
console.print(
Panel(
f"[green]✓ Found {len(issues_items)} issues and {len(prs_items)} PRs[/green]",
title="[green]Pagination Complete[/green]",
border_style="green",
)
)
if output == OutputFormat.JSON:
result = {"issues": issues_items, "prs": prs_items}
console.print(json.dumps(result, indent=2, ensure_ascii=False))
elif output == OutputFormat.TABLE:
display_table(issues_items, "issue")
console.print("")
display_table(prs_items, "pr")
else: # COUNT
console.print(f"Total issues: {len(issues_items)}")
console.print(f"Total PRs: {len(prs_items)}")
asyncio.run(async_main())
if __name__ == "__main__":
app()

View File

@@ -0,0 +1,553 @@
---
name: github-pr-triage
description: "Triage GitHub Pull Requests with parallel analysis. 1 PR = 1 background agent. Exhaustive pagination. Analyzes: merge readiness, project alignment, staleness, auto-close eligibility. Conservative auto-close with friendly messages. Triggers: 'triage PRs', 'analyze PRs', 'PR cleanup'."
---
# GitHub PR Triage Specialist
You are a GitHub Pull Request triage automation agent. Your job is to:
1. Fetch **EVERY SINGLE OPEN PR** using **EXHAUSTIVE PAGINATION**
2. Launch ONE background agent PER PR for parallel analysis
3. **CONSERVATIVELY** auto-close PRs that are clearly closeable
4. Generate a comprehensive triage report
---
# CRITICAL: EXHAUSTIVE PAGINATION IS MANDATORY
**THIS IS THE MOST IMPORTANT RULE. VIOLATION = COMPLETE FAILURE.**
## YOU MUST FETCH ALL PRs. PERIOD.
| WRONG | CORRECT |
|----------|------------|
| `gh pr list --limit 100` and stop | Paginate until ZERO results returned |
| "I found 16 PRs" (first page only) | "I found 61 PRs after 5 pages" |
| Assuming first page is enough | Using `--limit 500` and verifying count |
| Stopping when you "feel" you have enough | Stopping ONLY when API returns empty |
### WHY THIS MATTERS
- GitHub API returns **max 100 PRs per request** by default
- A busy repo can have **50-100+ open PRs**
- **MISSING PRs = MISSING CONTRIBUTOR WORK = BAD COMMUNITY EXPERIENCE**
- The user asked for triage, not "sample triage"
### THE ONLY ACCEPTABLE APPROACH
```bash
# ALWAYS use --limit 500 (maximum allowed)
# ALWAYS check if more pages exist
# ALWAYS continue until empty result
gh pr list --repo $REPO --state open --limit 500 --json number,title,state,createdAt,updatedAt,labels,author,headRefName,baseRefName,isDraft,mergeable,body
```
**If the result count equals your limit, THERE ARE MORE PRs. KEEP FETCHING.**
---
## PHASE 1: PR Collection (EXHAUSTIVE Pagination)
### 1.1 Determine Repository
Extract from user request:
- `REPO`: Repository in `owner/repo` format (default: current repo via `gh repo view --json nameWithOwner -q .nameWithOwner`)
### 1.2 Exhaustive Pagination Loop
# STOP. READ THIS BEFORE EXECUTING.
**YOU WILL FETCH EVERY. SINGLE. OPEN PR. NO EXCEPTIONS.**
## USE THE BUNDLED SCRIPT (MANDATORY)
**Use the bundled `scripts/gh_fetch.py` script for exhaustive pagination:**
```bash
# Fetch all open PRs (default)
./scripts/gh_fetch.py prs --output json
# Fetch PRs from last 48 hours
./scripts/gh_fetch.py prs --hours 48 --output json
# Fetch from specific repo
./scripts/gh_fetch.py prs --repo owner/repo --state open --output json
```
The script:
- Handles pagination automatically (fetches ALL pages until empty)
- Outputs JSON that you can parse for agent distribution
- Filters by time range if `--hours` is specified
---
## FALLBACK: Manual Bash Pagination
If the Python script is unavailable, follow this manual approach:
## THE GOLDEN RULE
```
NEVER use --limit 100. ALWAYS use --limit 500.
NEVER stop at first result. ALWAYS verify you got everything.
NEVER assume "that's probably all". ALWAYS check if more exist.
```
## MANUAL PAGINATION LOOP (ONLY IF PYTHON SCRIPT UNAVAILABLE)
You MUST execute this EXACT pagination loop. DO NOT simplify. DO NOT skip iterations.
```bash
#!/bin/bash
# MANDATORY PAGINATION - Execute this EXACTLY as written
REPO=$(gh repo view --json nameWithOwner -q .nameWithOwner)
echo "=== EXHAUSTIVE PR PAGINATION START ==="
echo "Repository: $REPO"
echo ""
# STEP 1: First fetch with --limit 500
echo "[Page 1] Fetching open PRs..."
FIRST_FETCH=$(gh pr list --repo $REPO --state open --limit 500 --json number,title,state,createdAt,updatedAt,labels,author,headRefName,baseRefName,isDraft,mergeable,body)
FIRST_COUNT=$(echo "$FIRST_FETCH" | jq 'length')
echo "[Page 1] Count: $FIRST_COUNT"
ALL_PRS="$FIRST_FETCH"
# STEP 2: CHECK IF MORE PAGES NEEDED
# If we got exactly 500, there are MORE PRs!
if [ "$FIRST_COUNT" -eq 500 ]; then
echo ""
echo "WARNING: Got exactly 500 results. MORE PAGES EXIST!"
echo "Continuing pagination..."
PAGE=2
# Keep fetching until we get less than 500
while true; do
echo ""
echo "[Page $PAGE] Fetching more PRs..."
# Use search API with pagination for more results
LAST_CREATED=$(echo "$ALL_PRS" | jq -r '.[-1].createdAt')
NEXT_FETCH=$(gh pr list --repo $REPO --state open --limit 500 \
--json number,title,state,createdAt,updatedAt,labels,author,headRefName,baseRefName,isDraft,mergeable,body \
--search "created:<$LAST_CREATED")
NEXT_COUNT=$(echo "$NEXT_FETCH" | jq 'length')
echo "[Page $PAGE] Count: $NEXT_COUNT"
if [ "$NEXT_COUNT" -eq 0 ]; then
echo "[Page $PAGE] No more results. Pagination complete."
break
fi
# Merge results
ALL_PRS=$(echo "$ALL_PRS $NEXT_FETCH" | jq -s 'add | unique_by(.number)')
CURRENT_TOTAL=$(echo "$ALL_PRS" | jq 'length')
echo "[Page $PAGE] Running total: $CURRENT_TOTAL PRs"
if [ "$NEXT_COUNT" -lt 500 ]; then
echo "[Page $PAGE] Less than 500 results. Pagination complete."
break
fi
PAGE=$((PAGE + 1))
# Safety limit
if [ $PAGE -gt 20 ]; then
echo "SAFETY LIMIT: Stopped at page 20"
break
fi
done
fi
# STEP 3: FINAL COUNT
FINAL_COUNT=$(echo "$ALL_PRS" | jq 'length')
echo ""
echo "=== EXHAUSTIVE PR PAGINATION COMPLETE ==="
echo "Total open PRs found: $FINAL_COUNT"
echo ""
```
## VERIFICATION CHECKLIST (MANDATORY)
BEFORE proceeding to Phase 2, you MUST verify:
```
CHECKLIST:
[ ] Executed the FULL pagination loop above (not just --limit 500 once)
[ ] Saw "EXHAUSTIVE PR PAGINATION COMPLETE" in output
[ ] Counted total PRs: _____ (fill this in)
[ ] If first fetch returned 500, continued to page 2+
[ ] Used --state open
```
**If you did NOT see "EXHAUSTIVE PR PAGINATION COMPLETE", you did it WRONG. Start over.**
---
## PHASE 2: Parallel PR Analysis (1 PR = 1 Agent)
### 2.1 Agent Assignment
**ALL PRs use `unspecified-low` category.** No ratio distribution needed.
### 2.2 Launch Background Agents
**MANDATORY: Each PR gets its own dedicated background agent.**
For each PR, launch:
```typescript
delegate_task(
category="unspecified-low",
load_skills=[],
run_in_background=true,
prompt=`
## TASK
Analyze GitHub PR #${pr.number} for ${REPO} to determine if it can be closed or merged.
## PR DATA
- Number: #${pr.number}
- Title: ${pr.title}
- State: ${pr.state}
- Author: ${pr.author.login}
- Created: ${pr.createdAt}
- Updated: ${pr.updatedAt}
- Labels: ${pr.labels.map(l => l.name).join(', ')}
- Head Branch: ${pr.headRefName}
- Base Branch: ${pr.baseRefName}
- Is Draft: ${pr.isDraft}
- Mergeable: ${pr.mergeable}
## PR BODY
${pr.body}
## FETCH ADDITIONAL CONTEXT
1. Fetch PR comments: gh pr view ${pr.number} --repo ${REPO} --json comments
2. Fetch PR reviews: gh pr view ${pr.number} --repo ${REPO} --json reviews
3. Fetch PR files changed: gh pr view ${pr.number} --repo ${REPO} --json files
4. Check if branch exists: git ls-remote --heads origin ${pr.headRefName}
5. Check base branch for similar changes: Search if the changes were already implemented
## ANALYSIS CHECKLIST
1. **MERGE_READY**: Can this PR be merged?
- Has approvals
- CI passed
- No conflicts
- Not draft
2. **PROJECT_ALIGNED**: Does this PR align with current project direction?
- YES: Fits project goals and architecture
- NO: Contradicts current direction or outdated approach
- UNCLEAR: Needs maintainer decision
3. **CLOSE_ELIGIBILITY** (CONSERVATIVE - only these cases):
- ALREADY_IMPLEMENTED: The feature/fix already exists in main branch (search codebase to verify)
- ALREADY_FIXED: A different PR already addressed this issue
- OUTDATED_DIRECTION: Project direction has fundamentally changed, this approach is no longer valid
- STALE_ABANDONED: No activity for 6+ months, author unresponsive
4. **STALENESS**:
- ACTIVE: Updated within 30 days
- STALE: No updates for 30-180 days
- ABANDONED: No updates for 180+ days
## CONSERVATIVE CLOSE CRITERIA
**YOU MAY ONLY RECOMMEND CLOSING IF ONE OF THESE IS CLEARLY TRUE:**
- The exact same change already exists in the main branch
- A merged PR already solved the same problem differently
- The project explicitly deprecated or removed the feature this PR adds
- Author has been unresponsive for 6+ months despite requests
**DO NOT CLOSE FOR:**
- "Could be done better" - that's a review comment, not a close reason
- "Needs rebasing" - contributor can fix this
- "Missing tests" - contributor can add these
- "I prefer a different approach" - discuss, don't close
## IF CLOSING IS RECOMMENDED
Provide a friendly, detailed English message explaining:
1. Why the PR is being closed
2. What happened (already implemented, etc.)
3. Thank the contributor for their effort
4. Offer guidance for future contributions
## RETURN FORMAT
\`\`\`
#${pr.number}: ${pr.title}
MERGE_READY: [YES|NO|NEEDS_WORK] - [reason]
ALIGNED: [YES|NO|UNCLEAR] - [reason]
CLOSE_ELIGIBLE: [YES|NO] - [reason if YES: ALREADY_IMPLEMENTED|ALREADY_FIXED|OUTDATED_DIRECTION|STALE_ABANDONED]
STALENESS: [ACTIVE|STALE|ABANDONED]
RECOMMENDATION: [MERGE|CLOSE|REVIEW|WAIT]
CLOSE_MESSAGE: [If CLOSE_ELIGIBLE=YES, provide the friendly closing message. Otherwise "N/A"]
SUMMARY: [1-2 sentence summary of PR status]
ACTION: [Recommended maintainer action]
\`\`\`
`
)
```
### 2.3 Collect All Results
Wait for all background agents to complete, then collect:
```typescript
// Store all task IDs
const taskIds: string[] = []
// Launch all agents
for (const pr of prs) {
const result = await delegate_task(...)
taskIds.push(result.task_id)
}
// Collect results
const results = []
for (const taskId of taskIds) {
const output = await background_output(task_id=taskId)
results.push(output)
}
```
---
## PHASE 3: Auto-Close Execution (CONSERVATIVE)
### 3.1 Identify Closeable PRs
From the collected results, identify PRs where:
- `CLOSE_ELIGIBLE: YES`
- Clear reason: `ALREADY_IMPLEMENTED`, `ALREADY_FIXED`, `OUTDATED_DIRECTION`, or `STALE_ABANDONED`
### 3.2 Close with Friendly Message
For each closeable PR:
```bash
gh pr close ${pr.number} --repo ${REPO} --comment "${CLOSE_MESSAGE}"
```
**Example Friendly Close Messages:**
**ALREADY_IMPLEMENTED:**
```
Hi @${author}, thank you for taking the time to contribute this PR!
After reviewing, I found that this functionality was already implemented in PR #XXX (merged on YYYY-MM-DD). The changes you proposed are now part of the main branch.
We really appreciate your effort and contribution to the project. Please don't let this discourage you - your willingness to improve the project is valuable!
If you'd like to contribute in other areas, check out our "good first issue" label for ideas.
Closing this as the changes are already in place. Thanks again!
```
**ALREADY_FIXED:**
```
Hi @${author}, thank you for this PR!
It looks like this issue was addressed by a different approach in PR #YYY, which was merged on YYYY-MM-DD. The underlying problem this PR aimed to solve has been resolved.
Thank you for your contribution and for caring about this issue. We appreciate contributors like you who take the initiative to fix problems!
Closing this as the issue is now resolved. If you notice any remaining problems, please feel free to open a new issue.
```
**OUTDATED_DIRECTION:**
```
Hi @${author}, thank you for working on this PR!
Since this PR was opened, the project direction has evolved. [Specific explanation of what changed - e.g., "We've moved away from X approach in favor of Y" or "This feature was superseded by Z"].
We genuinely appreciate the time you invested in this contribution. The work you did helped inform our discussions about the right direction.
Closing this due to the architectural changes. If you're interested in contributing to the new approach, we'd love to have you! Check out [relevant area] for opportunities.
```
**STALE_ABANDONED:**
```
Hi @${author}, thank you for opening this PR!
This PR has been open for over 6 months without activity, and we haven't heard back despite our follow-up requests. We're closing it to keep our PR queue manageable.
This doesn't mean your contribution wasn't valuable - life gets busy, and we totally understand!
If you'd like to pick this up again, feel free to:
1. Reopen this PR, or
2. Open a new PR with the updated changes
We'd be happy to review it when you're ready. Thanks for your interest in contributing!
```
---
## PHASE 4: Report Generation
### 4.1 Categorize Results
Group analyzed PRs by status:
| Category | Criteria |
|----------|----------|
| **READY_TO_MERGE** | Approved, CI passed, no conflicts |
| **AUTO_CLOSED** | Closed during this triage (with reasons) |
| **NEEDS_REVIEW** | Awaiting maintainer review |
| **NEEDS_WORK** | Requires changes from author |
| **STALE** | No activity for 30+ days |
| **DRAFT** | Still work in progress |
### 4.2 Generate Report
```markdown
# PR Triage Report
**Repository:** ${REPO}
**Generated:** ${new Date().toISOString()}
**Total Open PRs Analyzed:** ${prs.length}
## Summary
| Category | Count |
|----------|-------|
| Ready to Merge | N |
| Auto-Closed | N |
| Needs Review | N |
| Needs Work | N |
| Stale | N |
| Draft | N |
---
## 1. AUTO-CLOSED PRs (Action Taken)
These PRs were closed during this triage session:
| PR | Title | Reason | Message Posted |
|----|-------|--------|----------------|
| #123 | Feature X | ALREADY_IMPLEMENTED | Yes |
---
## 2. Ready to Merge
| PR | Title | Author | Approvals | CI | Last Updated |
|----|-------|--------|-----------|-----|--------------|
| #456 | Fix Y | user | 2 | Pass | 2d ago |
**Action Required:** Review and merge these PRs.
---
## 3. Needs Review
| PR | Title | Author | Created | Last Updated |
|----|-------|--------|---------|--------------|
| #789 | Add Z | user | 5d ago | 3d ago |
**Action Required:** Assign reviewers and provide feedback.
---
## 4. Needs Work
| PR | Title | Author | Issue |
|----|-------|--------|-------|
| #101 | Update A | user | Failing CI, needs rebase |
**Action Required:** Comment with specific guidance.
---
## 5. Stale PRs
| PR | Title | Author | Last Activity | Days Inactive |
|----|-------|--------|---------------|---------------|
| #112 | Old B | user | 2025-01-01 | 45 |
**Action Required:** Ping author or close if abandoned.
---
## 6. Draft PRs
| PR | Title | Author | Created |
|----|-------|--------|---------|
| #131 | WIP C | user | 10d ago |
**No action required** - authors are still working on these.
---
## Recommendations
1. **Merge immediately:** [list PRs ready]
2. **Assign reviewers:** [list PRs awaiting review]
3. **Follow up with authors:** [list stale PRs]
4. **Consider closing:** [list abandoned PRs not auto-closed due to uncertainty]
```
---
## ANTI-PATTERNS (BLOCKING VIOLATIONS)
## IF YOU DO ANY OF THESE, THE TRIAGE IS INVALID
| Violation | Why It's Wrong | Severity |
|-----------|----------------|----------|
| **Using `--limit 100`** | Misses 80%+ of PRs in active repos | CRITICAL |
| **Stopping at first fetch** | GitHub paginates - you only got page 1 | CRITICAL |
| **Not counting results** | Can't verify completeness | CRITICAL |
| **Closing PRs aggressively** | Hurts contributors, damages community | CRITICAL |
| Batching PRs (7 per agent) | Loses detail, harder to track | HIGH |
| Sequential agent calls | Slow, doesn't leverage parallelism | HIGH |
| Closing for "could be better" | That's review feedback, not close reason | HIGH |
| Generic close messages | Each closure needs specific, friendly explanation | MEDIUM |
---
## CONSERVATIVE CLOSE POLICY (MANDATORY)
**YOU ARE NOT THE MAINTAINER. YOU ARE A TRIAGE ASSISTANT.**
### You MAY close when:
- Evidence proves the change already exists in main
- A merged PR already solved the same problem
- Project explicitly deprecated/removed the relevant feature
- Author unresponsive for 6+ months despite attempts
### You MAY NOT close when:
- You think a different approach is better
- PR needs rebasing or has conflicts
- PR is missing tests or documentation
- You're unsure about project direction
- Author is active but busy
### When in doubt: DO NOT CLOSE. Flag for maintainer review.
---
## EXECUTION CHECKLIST
- [ ] Fetched ALL pages of open PRs (pagination complete)
- [ ] Launched 1 agent per PR (not batched)
- [ ] All agents ran in background (parallel)
- [ ] Collected all results before taking action
- [ ] Only closed PRs meeting CONSERVATIVE criteria
- [ ] Posted friendly, detailed close messages
- [ ] Generated comprehensive report
---
## Quick Start
When invoked, immediately:
1. `gh repo view --json nameWithOwner -q .nameWithOwner` (get current repo)
2. Exhaustive pagination for ALL open PRs
3. Launch N background agents (1 per PR)
4. Collect all results
5. Auto-close PRs meeting CONSERVATIVE criteria with friendly messages
6. Generate categorized report with action items

View File

@@ -0,0 +1,373 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "typer>=0.12.0",
# "rich>=13.0.0",
# ]
# ///
"""
GitHub Issues/PRs Fetcher with Exhaustive Pagination.
Fetches ALL issues and/or PRs from a GitHub repository using gh CLI.
Implements proper pagination to ensure no items are missed.
Usage:
./gh_fetch.py issues # Fetch all issues
./gh_fetch.py prs # Fetch all PRs
./gh_fetch.py all # Fetch both issues and PRs
./gh_fetch.py issues --hours 48 # Issues from last 48 hours
./gh_fetch.py prs --state open # Only open PRs
./gh_fetch.py all --repo owner/repo # Specify repository
"""
import asyncio
import json
from datetime import UTC, datetime, timedelta
from enum import Enum
from typing import Annotated
import typer
from rich.console import Console
from rich.panel import Panel
from rich.progress import Progress, TaskID
from rich.table import Table
app = typer.Typer(
name="gh_fetch",
help="Fetch GitHub issues/PRs with exhaustive pagination.",
no_args_is_help=True,
)
console = Console()
BATCH_SIZE = 500 # Maximum allowed by GitHub API
class ItemState(str, Enum):
ALL = "all"
OPEN = "open"
CLOSED = "closed"
class OutputFormat(str, Enum):
JSON = "json"
TABLE = "table"
COUNT = "count"
async def run_gh_command(args: list[str]) -> tuple[str, str, int]:
"""Run gh CLI command asynchronously."""
proc = await asyncio.create_subprocess_exec(
"gh",
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
return stdout.decode(), stderr.decode(), proc.returncode or 0
async def get_current_repo() -> str:
"""Get the current repository from gh CLI."""
stdout, stderr, code = await run_gh_command(["repo", "view", "--json", "nameWithOwner", "-q", ".nameWithOwner"])
if code != 0:
console.print(f"[red]Error getting current repo: {stderr}[/red]")
raise typer.Exit(1)
return stdout.strip()
async def fetch_items_page(
repo: str,
item_type: str, # "issue" or "pr"
state: str,
limit: int,
search_filter: str = "",
) -> list[dict]:
"""Fetch a single page of issues or PRs."""
cmd = [
item_type,
"list",
"--repo",
repo,
"--state",
state,
"--limit",
str(limit),
"--json",
"number,title,state,createdAt,updatedAt,labels,author,body",
]
if search_filter:
cmd.extend(["--search", search_filter])
stdout, stderr, code = await run_gh_command(cmd)
if code != 0:
console.print(f"[red]Error fetching {item_type}s: {stderr}[/red]")
return []
try:
return json.loads(stdout) if stdout.strip() else []
except json.JSONDecodeError:
console.print(f"[red]Error parsing {item_type} response[/red]")
return []
async def fetch_all_items(
repo: str,
item_type: str,
state: str,
hours: int | None,
progress: Progress,
task_id: TaskID,
) -> list[dict]:
"""Fetch ALL items with exhaustive pagination."""
all_items: list[dict] = []
page = 1
# First fetch
progress.update(task_id, description=f"[cyan]Fetching {item_type}s page {page}...")
items = await fetch_items_page(repo, item_type, state, BATCH_SIZE)
fetched_count = len(items)
all_items.extend(items)
console.print(f"[dim]Page {page}: fetched {fetched_count} {item_type}s[/dim]")
# Continue pagination if we got exactly BATCH_SIZE (more pages exist)
while fetched_count == BATCH_SIZE:
page += 1
progress.update(task_id, description=f"[cyan]Fetching {item_type}s page {page}...")
# Use created date of last item to paginate
last_created = all_items[-1].get("createdAt", "")
if not last_created:
break
search_filter = f"created:<{last_created}"
items = await fetch_items_page(repo, item_type, state, BATCH_SIZE, search_filter)
fetched_count = len(items)
if fetched_count == 0:
break
# Deduplicate by number
existing_numbers = {item["number"] for item in all_items}
new_items = [item for item in items if item["number"] not in existing_numbers]
all_items.extend(new_items)
console.print(
f"[dim]Page {page}: fetched {fetched_count}, added {len(new_items)} new (total: {len(all_items)})[/dim]"
)
# Safety limit
if page > 20:
console.print("[yellow]Safety limit reached (20 pages)[/yellow]")
break
# Filter by time if specified
if hours is not None:
cutoff = datetime.now(UTC) - timedelta(hours=hours)
cutoff_str = cutoff.isoformat()
original_count = len(all_items)
all_items = [
item
for item in all_items
if item.get("createdAt", "") >= cutoff_str or item.get("updatedAt", "") >= cutoff_str
]
filtered_count = original_count - len(all_items)
if filtered_count > 0:
console.print(f"[dim]Filtered out {filtered_count} items older than {hours} hours[/dim]")
return all_items
def display_table(items: list[dict], item_type: str) -> None:
"""Display items in a Rich table."""
table = Table(title=f"{item_type.upper()}s ({len(items)} total)")
table.add_column("#", style="cyan", width=6)
table.add_column("Title", style="white", max_width=50)
table.add_column("State", style="green", width=8)
table.add_column("Author", style="yellow", width=15)
table.add_column("Labels", style="magenta", max_width=30)
table.add_column("Updated", style="dim", width=12)
for item in items[:50]: # Show first 50
labels = ", ".join(label.get("name", "") for label in item.get("labels", []))
updated = item.get("updatedAt", "")[:10]
author = item.get("author", {}).get("login", "unknown")
table.add_row(
str(item.get("number", "")),
(item.get("title", "")[:47] + "...") if len(item.get("title", "")) > 50 else item.get("title", ""),
item.get("state", ""),
author,
(labels[:27] + "...") if len(labels) > 30 else labels,
updated,
)
console.print(table)
if len(items) > 50:
console.print(f"[dim]... and {len(items) - 50} more items[/dim]")
@app.command()
def issues(
repo: Annotated[str | None, typer.Option("--repo", "-r", help="Repository (owner/repo)")] = None,
state: Annotated[ItemState, typer.Option("--state", "-s", help="Issue state filter")] = ItemState.ALL,
hours: Annotated[
int | None,
typer.Option("--hours", "-h", help="Only issues from last N hours (created or updated)"),
] = None,
output: Annotated[OutputFormat, typer.Option("--output", "-o", help="Output format")] = OutputFormat.TABLE,
) -> None:
"""Fetch all issues with exhaustive pagination."""
async def async_main() -> None:
target_repo = repo or await get_current_repo()
console.print(f"""
[cyan]━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/cyan]
[cyan]Repository:[/cyan] {target_repo}
[cyan]State:[/cyan] {state.value}
[cyan]Time filter:[/cyan] {f"Last {hours} hours" if hours else "All time"}
[cyan]━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/cyan]
""")
with Progress(console=console) as progress:
task: TaskID = progress.add_task("[cyan]Fetching issues...", total=None)
items = await fetch_all_items(target_repo, "issue", state.value, hours, progress, task)
progress.update(task, description="[green]Complete!", completed=100, total=100)
console.print(
Panel(
f"[green]✓ Found {len(items)} issues[/green]",
title="[green]Pagination Complete[/green]",
border_style="green",
)
)
if output == OutputFormat.JSON:
console.print(json.dumps(items, indent=2, ensure_ascii=False))
elif output == OutputFormat.TABLE:
display_table(items, "issue")
else: # COUNT
console.print(f"Total issues: {len(items)}")
asyncio.run(async_main())
@app.command()
def prs(
repo: Annotated[str | None, typer.Option("--repo", "-r", help="Repository (owner/repo)")] = None,
state: Annotated[ItemState, typer.Option("--state", "-s", help="PR state filter")] = ItemState.OPEN,
hours: Annotated[
int | None,
typer.Option("--hours", "-h", help="Only PRs from last N hours (created or updated)"),
] = None,
output: Annotated[OutputFormat, typer.Option("--output", "-o", help="Output format")] = OutputFormat.TABLE,
) -> None:
"""Fetch all PRs with exhaustive pagination."""
async def async_main() -> None:
target_repo = repo or await get_current_repo()
console.print(f"""
[cyan]━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/cyan]
[cyan]Repository:[/cyan] {target_repo}
[cyan]State:[/cyan] {state.value}
[cyan]Time filter:[/cyan] {f"Last {hours} hours" if hours else "All time"}
[cyan]━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/cyan]
""")
with Progress(console=console) as progress:
task: TaskID = progress.add_task("[cyan]Fetching PRs...", total=None)
items = await fetch_all_items(target_repo, "pr", state.value, hours, progress, task)
progress.update(task, description="[green]Complete!", completed=100, total=100)
console.print(
Panel(
f"[green]✓ Found {len(items)} PRs[/green]",
title="[green]Pagination Complete[/green]",
border_style="green",
)
)
if output == OutputFormat.JSON:
console.print(json.dumps(items, indent=2, ensure_ascii=False))
elif output == OutputFormat.TABLE:
display_table(items, "pr")
else: # COUNT
console.print(f"Total PRs: {len(items)}")
asyncio.run(async_main())
@app.command(name="all")
def fetch_all(
repo: Annotated[str | None, typer.Option("--repo", "-r", help="Repository (owner/repo)")] = None,
state: Annotated[ItemState, typer.Option("--state", "-s", help="State filter")] = ItemState.ALL,
hours: Annotated[
int | None,
typer.Option("--hours", "-h", help="Only items from last N hours (created or updated)"),
] = None,
output: Annotated[OutputFormat, typer.Option("--output", "-o", help="Output format")] = OutputFormat.TABLE,
) -> None:
"""Fetch all issues AND PRs with exhaustive pagination."""
async def async_main() -> None:
target_repo = repo or await get_current_repo()
console.print(f"""
[cyan]━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/cyan]
[cyan]Repository:[/cyan] {target_repo}
[cyan]State:[/cyan] {state.value}
[cyan]Time filter:[/cyan] {f"Last {hours} hours" if hours else "All time"}
[cyan]Fetching:[/cyan] Issues AND PRs
[cyan]━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/cyan]
""")
with Progress(console=console) as progress:
issues_task: TaskID = progress.add_task("[cyan]Fetching issues...", total=None)
prs_task: TaskID = progress.add_task("[cyan]Fetching PRs...", total=None)
# Fetch in parallel
issues_items, prs_items = await asyncio.gather(
fetch_all_items(target_repo, "issue", state.value, hours, progress, issues_task),
fetch_all_items(target_repo, "pr", state.value, hours, progress, prs_task),
)
progress.update(
issues_task,
description="[green]Issues complete!",
completed=100,
total=100,
)
progress.update(prs_task, description="[green]PRs complete!", completed=100, total=100)
console.print(
Panel(
f"[green]✓ Found {len(issues_items)} issues and {len(prs_items)} PRs[/green]",
title="[green]Pagination Complete[/green]",
border_style="green",
)
)
if output == OutputFormat.JSON:
result = {"issues": issues_items, "prs": prs_items}
console.print(json.dumps(result, indent=2, ensure_ascii=False))
elif output == OutputFormat.TABLE:
display_table(issues_items, "issue")
console.print("")
display_table(prs_items, "pr")
else: # COUNT
console.print(f"Total issues: {len(issues_items)}")
console.print(f"Total PRs: {len(prs_items)}")
asyncio.run(async_main())
if __name__ == "__main__":
app()

373
uvscripts/gh_fetch.py Executable file
View File

@@ -0,0 +1,373 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "typer>=0.12.0",
# "rich>=13.0.0",
# ]
# ///
"""
GitHub Issues/PRs Fetcher with Exhaustive Pagination.
Fetches ALL issues and/or PRs from a GitHub repository using gh CLI.
Implements proper pagination to ensure no items are missed.
Usage:
./gh_fetch.py issues # Fetch all issues
./gh_fetch.py prs # Fetch all PRs
./gh_fetch.py all # Fetch both issues and PRs
./gh_fetch.py issues --hours 48 # Issues from last 48 hours
./gh_fetch.py prs --state open # Only open PRs
./gh_fetch.py all --repo owner/repo # Specify repository
"""
import asyncio
import json
from datetime import UTC, datetime, timedelta
from enum import Enum
from typing import Annotated
import typer
from rich.console import Console
from rich.panel import Panel
from rich.progress import Progress, TaskID
from rich.table import Table
app = typer.Typer(
name="gh_fetch",
help="Fetch GitHub issues/PRs with exhaustive pagination.",
no_args_is_help=True,
)
console = Console()
BATCH_SIZE = 500 # Maximum allowed by GitHub API
class ItemState(str, Enum):
ALL = "all"
OPEN = "open"
CLOSED = "closed"
class OutputFormat(str, Enum):
JSON = "json"
TABLE = "table"
COUNT = "count"
async def run_gh_command(args: list[str]) -> tuple[str, str, int]:
"""Run gh CLI command asynchronously."""
proc = await asyncio.create_subprocess_exec(
"gh",
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
return stdout.decode(), stderr.decode(), proc.returncode or 0
async def get_current_repo() -> str:
"""Get the current repository from gh CLI."""
stdout, stderr, code = await run_gh_command(["repo", "view", "--json", "nameWithOwner", "-q", ".nameWithOwner"])
if code != 0:
console.print(f"[red]Error getting current repo: {stderr}[/red]")
raise typer.Exit(1)
return stdout.strip()
async def fetch_items_page(
repo: str,
item_type: str, # "issue" or "pr"
state: str,
limit: int,
search_filter: str = "",
) -> list[dict]:
"""Fetch a single page of issues or PRs."""
cmd = [
item_type,
"list",
"--repo",
repo,
"--state",
state,
"--limit",
str(limit),
"--json",
"number,title,state,createdAt,updatedAt,labels,author,body",
]
if search_filter:
cmd.extend(["--search", search_filter])
stdout, stderr, code = await run_gh_command(cmd)
if code != 0:
console.print(f"[red]Error fetching {item_type}s: {stderr}[/red]")
return []
try:
return json.loads(stdout) if stdout.strip() else []
except json.JSONDecodeError:
console.print(f"[red]Error parsing {item_type} response[/red]")
return []
async def fetch_all_items(
repo: str,
item_type: str,
state: str,
hours: int | None,
progress: Progress,
task_id: TaskID,
) -> list[dict]:
"""Fetch ALL items with exhaustive pagination."""
all_items: list[dict] = []
page = 1
# First fetch
progress.update(task_id, description=f"[cyan]Fetching {item_type}s page {page}...")
items = await fetch_items_page(repo, item_type, state, BATCH_SIZE)
fetched_count = len(items)
all_items.extend(items)
console.print(f"[dim]Page {page}: fetched {fetched_count} {item_type}s[/dim]")
# Continue pagination if we got exactly BATCH_SIZE (more pages exist)
while fetched_count == BATCH_SIZE:
page += 1
progress.update(task_id, description=f"[cyan]Fetching {item_type}s page {page}...")
# Use created date of last item to paginate
last_created = all_items[-1].get("createdAt", "")
if not last_created:
break
search_filter = f"created:<{last_created}"
items = await fetch_items_page(repo, item_type, state, BATCH_SIZE, search_filter)
fetched_count = len(items)
if fetched_count == 0:
break
# Deduplicate by number
existing_numbers = {item["number"] for item in all_items}
new_items = [item for item in items if item["number"] not in existing_numbers]
all_items.extend(new_items)
console.print(
f"[dim]Page {page}: fetched {fetched_count}, added {len(new_items)} new (total: {len(all_items)})[/dim]"
)
# Safety limit
if page > 20:
console.print("[yellow]Safety limit reached (20 pages)[/yellow]")
break
# Filter by time if specified
if hours is not None:
cutoff = datetime.now(UTC) - timedelta(hours=hours)
cutoff_str = cutoff.isoformat()
original_count = len(all_items)
all_items = [
item
for item in all_items
if item.get("createdAt", "") >= cutoff_str or item.get("updatedAt", "") >= cutoff_str
]
filtered_count = original_count - len(all_items)
if filtered_count > 0:
console.print(f"[dim]Filtered out {filtered_count} items older than {hours} hours[/dim]")
return all_items
def display_table(items: list[dict], item_type: str) -> None:
"""Display items in a Rich table."""
table = Table(title=f"{item_type.upper()}s ({len(items)} total)")
table.add_column("#", style="cyan", width=6)
table.add_column("Title", style="white", max_width=50)
table.add_column("State", style="green", width=8)
table.add_column("Author", style="yellow", width=15)
table.add_column("Labels", style="magenta", max_width=30)
table.add_column("Updated", style="dim", width=12)
for item in items[:50]: # Show first 50
labels = ", ".join(label.get("name", "") for label in item.get("labels", []))
updated = item.get("updatedAt", "")[:10]
author = item.get("author", {}).get("login", "unknown")
table.add_row(
str(item.get("number", "")),
(item.get("title", "")[:47] + "...") if len(item.get("title", "")) > 50 else item.get("title", ""),
item.get("state", ""),
author,
(labels[:27] + "...") if len(labels) > 30 else labels,
updated,
)
console.print(table)
if len(items) > 50:
console.print(f"[dim]... and {len(items) - 50} more items[/dim]")
@app.command()
def issues(
repo: Annotated[str | None, typer.Option("--repo", "-r", help="Repository (owner/repo)")] = None,
state: Annotated[ItemState, typer.Option("--state", "-s", help="Issue state filter")] = ItemState.ALL,
hours: Annotated[
int | None,
typer.Option("--hours", "-h", help="Only issues from last N hours (created or updated)"),
] = None,
output: Annotated[OutputFormat, typer.Option("--output", "-o", help="Output format")] = OutputFormat.TABLE,
) -> None:
"""Fetch all issues with exhaustive pagination."""
async def async_main() -> None:
target_repo = repo or await get_current_repo()
console.print(f"""
[cyan]━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/cyan]
[cyan]Repository:[/cyan] {target_repo}
[cyan]State:[/cyan] {state.value}
[cyan]Time filter:[/cyan] {f"Last {hours} hours" if hours else "All time"}
[cyan]━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/cyan]
""")
with Progress(console=console) as progress:
task: TaskID = progress.add_task("[cyan]Fetching issues...", total=None)
items = await fetch_all_items(target_repo, "issue", state.value, hours, progress, task)
progress.update(task, description="[green]Complete!", completed=100, total=100)
console.print(
Panel(
f"[green]✓ Found {len(items)} issues[/green]",
title="[green]Pagination Complete[/green]",
border_style="green",
)
)
if output == OutputFormat.JSON:
console.print(json.dumps(items, indent=2, ensure_ascii=False))
elif output == OutputFormat.TABLE:
display_table(items, "issue")
else: # COUNT
console.print(f"Total issues: {len(items)}")
asyncio.run(async_main())
@app.command()
def prs(
repo: Annotated[str | None, typer.Option("--repo", "-r", help="Repository (owner/repo)")] = None,
state: Annotated[ItemState, typer.Option("--state", "-s", help="PR state filter")] = ItemState.OPEN,
hours: Annotated[
int | None,
typer.Option("--hours", "-h", help="Only PRs from last N hours (created or updated)"),
] = None,
output: Annotated[OutputFormat, typer.Option("--output", "-o", help="Output format")] = OutputFormat.TABLE,
) -> None:
"""Fetch all PRs with exhaustive pagination."""
async def async_main() -> None:
target_repo = repo or await get_current_repo()
console.print(f"""
[cyan]━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/cyan]
[cyan]Repository:[/cyan] {target_repo}
[cyan]State:[/cyan] {state.value}
[cyan]Time filter:[/cyan] {f"Last {hours} hours" if hours else "All time"}
[cyan]━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/cyan]
""")
with Progress(console=console) as progress:
task: TaskID = progress.add_task("[cyan]Fetching PRs...", total=None)
items = await fetch_all_items(target_repo, "pr", state.value, hours, progress, task)
progress.update(task, description="[green]Complete!", completed=100, total=100)
console.print(
Panel(
f"[green]✓ Found {len(items)} PRs[/green]",
title="[green]Pagination Complete[/green]",
border_style="green",
)
)
if output == OutputFormat.JSON:
console.print(json.dumps(items, indent=2, ensure_ascii=False))
elif output == OutputFormat.TABLE:
display_table(items, "pr")
else: # COUNT
console.print(f"Total PRs: {len(items)}")
asyncio.run(async_main())
@app.command(name="all")
def fetch_all(
repo: Annotated[str | None, typer.Option("--repo", "-r", help="Repository (owner/repo)")] = None,
state: Annotated[ItemState, typer.Option("--state", "-s", help="State filter")] = ItemState.ALL,
hours: Annotated[
int | None,
typer.Option("--hours", "-h", help="Only items from last N hours (created or updated)"),
] = None,
output: Annotated[OutputFormat, typer.Option("--output", "-o", help="Output format")] = OutputFormat.TABLE,
) -> None:
"""Fetch all issues AND PRs with exhaustive pagination."""
async def async_main() -> None:
target_repo = repo or await get_current_repo()
console.print(f"""
[cyan]━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/cyan]
[cyan]Repository:[/cyan] {target_repo}
[cyan]State:[/cyan] {state.value}
[cyan]Time filter:[/cyan] {f"Last {hours} hours" if hours else "All time"}
[cyan]Fetching:[/cyan] Issues AND PRs
[cyan]━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/cyan]
""")
with Progress(console=console) as progress:
issues_task: TaskID = progress.add_task("[cyan]Fetching issues...", total=None)
prs_task: TaskID = progress.add_task("[cyan]Fetching PRs...", total=None)
# Fetch in parallel
issues_items, prs_items = await asyncio.gather(
fetch_all_items(target_repo, "issue", state.value, hours, progress, issues_task),
fetch_all_items(target_repo, "pr", state.value, hours, progress, prs_task),
)
progress.update(
issues_task,
description="[green]Issues complete!",
completed=100,
total=100,
)
progress.update(prs_task, description="[green]PRs complete!", completed=100, total=100)
console.print(
Panel(
f"[green]✓ Found {len(issues_items)} issues and {len(prs_items)} PRs[/green]",
title="[green]Pagination Complete[/green]",
border_style="green",
)
)
if output == OutputFormat.JSON:
result = {"issues": issues_items, "prs": prs_items}
console.print(json.dumps(result, indent=2, ensure_ascii=False))
elif output == OutputFormat.TABLE:
display_table(issues_items, "issue")
console.print("")
display_table(prs_items, "pr")
else: # COUNT
console.print(f"Total issues: {len(issues_items)}")
console.print(f"Total PRs: {len(prs_items)}")
asyncio.run(async_main())
if __name__ == "__main__":
app()