Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 228 additions & 8 deletions src/gitfetch/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -911,7 +911,7 @@ def get_authenticated_user(self) -> str:
"""
headers = {'Authorization': f'Bearer {self.token}'}
response = requests.post(
f'{self.base_url}/graphql',
f'{self.base_url}/query',
json={'query': query},
headers=headers,
timeout=10
Expand Down Expand Up @@ -949,7 +949,7 @@ def fetch_user_data(self, username: str) -> Dict[str, Any]:
headers = {
'Authorization': f'Bearer {self.token}'} if self.token else {}
response = requests.post(
f'{self.base_url}/graphql',
f'{self.base_url}/query',
json={'query': query},
headers=headers,
timeout=30
Expand All @@ -964,20 +964,240 @@ def fetch_user_stats(self, username: str, user_data=None):
"""
Fetch detailed statistics for a Sourcehut user.

Fetches the authenticated user's own repositories and their commit logs
via GraphQL, filtering commits by matching author email.

Args:
username: Sourcehut username
user_data: Optional pre-fetched user data

Returns:
Dictionary containing user statistics
"""
# Sourcehut has limited public stats, return minimal data
if not self.token:
return {
'total_stars': 0,
'total_forks': 0,
'total_repos': 0,
'languages': {},
'contribution_graph': [],
'pull_requests': {'open': 0, 'awaiting_review': 0, 'mentions': 0},
'issues': {'assigned': 0, 'created': 0, 'mentions': 0},
}

import requests
headers = {'Authorization': f'Bearer {self.token}'}

# Step 1: get the user's own email from the 'me' query
me_query = """
query {
me {
username
email
}
}
"""
try:
resp = requests.post(
f'{self.base_url}/query',
json={'query': me_query},
headers=headers,
timeout=10
)
resp.raise_for_status()
me_data = resp.json().get('data', {}).get('me', {})
user_email = me_data.get('email', '')
auth_username = me_data.get('username', username)
except Exception:
user_email = ''
auth_username = username

# Step 2: fetch repos for the authenticated user
repo_query = """
query {
me {
repositories(filter: { count: 100 }) {
results {
name
log {
cursor
results {
id
author {
email
time
}
}
}
}
}
}
}
"""
try:
resp = requests.post(
f'{self.base_url}/query',
json={'query': repo_query},
headers=headers,
timeout=30
)
resp.raise_for_status()
data = resp.json()
except Exception as e:
raise Exception(f"Sourcehut API request failed: {e}")

repos = (data.get('data', {}).get('me', {})
.get('repositories', {}).get('results', []))

total_repos = len(repos)
commit_timestamps = []
languages = {}

for repo in repos:
repo_name = repo.get('name', '')
log = repo.get('log', {})
commits = log.get('results', [])
cursor = log.get('cursor')

# Process first page of commits
self._process_sourcehut_commits(
commits, user_email, commit_timestamps, languages
)

# Paginate through remaining commit pages
while cursor:
page_query = f"""
query {{
me {{
repository(name: "{repo_name}") {{
log(cursor: "{cursor}") {{
cursor
results {{
id
author {{
email
time
}}
}}
}}
}}
}}
}}
"""
try:
resp = requests.post(
f'{self.base_url}/query',
json={'query': page_query},
headers=headers,
timeout=30
)
resp.raise_for_status()
page_data = resp.json()
page_log = (page_data.get('data', {}).get('me', {})
.get('repository', {}).get('log', {}))
page_commits = page_log.get('results', [])
cursor = page_log.get('cursor')
self._process_sourcehut_commits(
page_commits, user_email, commit_timestamps, languages
)
except Exception:
break # Stop pagination on error

# Build contribution graph from timestamps
contrib_graph = self._build_sourcehut_contribution_graph(commit_timestamps)

# Calculate current streak
current_streak = self._calculate_sourcehut_streak(commit_timestamps)

return {
'total_stars': 0, # Not available
'total_forks': 0, # Not available
'total_repos': 0, # Would need separate API call
'languages': {}, # Not available
'contribution_graph': [], # Not available
'total_stars': 0,
'total_forks': 0,
'total_repos': total_repos,
'languages': languages,
'contribution_graph': contrib_graph,
'current_streak': current_streak,
'pull_requests': {'open': 0, 'awaiting_review': 0, 'mentions': 0},
'issues': {'assigned': 0, 'created': 0, 'mentions': 0},
}

@staticmethod
def _process_sourcehut_commits(commits, user_email, commit_timestamps, languages):
"""Process a list of Sourcehut commits, filtering by user email."""
from datetime import datetime
for commit in commits:
author = commit.get('author', {})
email = author.get('email', '')
time_str = author.get('time', '')

if user_email and email != user_email:
continue

if time_str:
try:
dt = datetime.fromisoformat(time_str.replace('Z', '+00:00'))
commit_timestamps.append(dt)
except (ValueError, TypeError):
pass

@staticmethod
def _build_sourcehut_contribution_graph(commit_timestamps):
"""Build a contribution graph (weeks of days) from commit timestamps."""
from datetime import datetime, timedelta
from collections import Counter

if not commit_timestamps:
return []

# Count commits per day
day_counts = Counter(dt.date().isoformat() for dt in commit_timestamps)

# Get date range (last year)
end_date = datetime.now().date()
start_date = end_date - timedelta(days=365)

# Build weeks
weeks = []
current_date = start_date
while current_date <= end_date:
week = {'contributionDays': []}
for i in range(7):
day_date = current_date + timedelta(days=i)
if day_date > end_date:
break
count = day_counts.get(day_date.isoformat(), 0)
week['contributionDays'].append({
'contributionCount': count,
'date': day_date.isoformat()
})
if week['contributionDays']:
weeks.append(week)
current_date += timedelta(days=7)

return weeks

@staticmethod
def _calculate_sourcehut_streak(commit_timestamps):
"""Calculate current consecutive-day contribution streak."""
from datetime import datetime, timedelta
from collections import Counter

if not commit_timestamps:
return 0

day_counts = Counter(dt.date() for dt in commit_timestamps)

streak = 0
current = datetime.now().date()
while current in day_counts and day_counts[current] > 0:
streak += 1
current -= timedelta(days=1)

# Check if today hasn't had commits yet — streak may still be active
# starting from yesterday
if streak == 0:
current = datetime.now().date() - timedelta(days=1)
while current in day_counts and day_counts[current] > 0:
streak += 1
current -= timedelta(days=1)

return streak
Loading