diff --git a/src/gitfetch/fetcher.py b/src/gitfetch/fetcher.py index c86e972..2b55d12 100644 --- a/src/gitfetch/fetcher.py +++ b/src/gitfetch/fetcher.py @@ -911,7 +911,7 @@ def get_authenticated_user(self) -> str: """ headers = {'Authorization': f'Bearer {self.token}'} response = requests.post( - f'{self.base_url}/graphql', + f'{self.base_url}/query', json={'query': query}, headers=headers, timeout=10 @@ -949,7 +949,7 @@ def fetch_user_data(self, username: str) -> Dict[str, Any]: headers = { 'Authorization': f'Bearer {self.token}'} if self.token else {} response = requests.post( - f'{self.base_url}/graphql', + f'{self.base_url}/query', json={'query': query}, headers=headers, timeout=30 @@ -964,6 +964,9 @@ def fetch_user_stats(self, username: str, user_data=None): """ Fetch detailed statistics for a Sourcehut user. + Fetches the authenticated user's own repositories and their commit logs + via GraphQL, filtering commits by matching author email. + Args: username: Sourcehut username user_data: Optional pre-fetched user data @@ -971,13 +974,230 @@ def fetch_user_stats(self, username: str, user_data=None): Returns: Dictionary containing user statistics """ - # Sourcehut has limited public stats, return minimal data + if not self.token: + return { + 'total_stars': 0, + 'total_forks': 0, + 'total_repos': 0, + 'languages': {}, + 'contribution_graph': [], + 'pull_requests': {'open': 0, 'awaiting_review': 0, 'mentions': 0}, + 'issues': {'assigned': 0, 'created': 0, 'mentions': 0}, + } + + import requests + headers = {'Authorization': f'Bearer {self.token}'} + + # Step 1: get the user's own email from the 'me' query + me_query = """ + query { + me { + username + email + } + } + """ + try: + resp = requests.post( + f'{self.base_url}/query', + json={'query': me_query}, + headers=headers, + timeout=10 + ) + resp.raise_for_status() + me_data = resp.json().get('data', {}).get('me', {}) + user_email = me_data.get('email', '') + auth_username = me_data.get('username', username) + except Exception: + user_email = '' + auth_username = username + + # Step 2: fetch repos for the authenticated user + repo_query = """ + query { + me { + repositories(filter: { count: 100 }) { + results { + name + log { + cursor + results { + id + author { + email + time + } + } + } + } + } + } + } + """ + try: + resp = requests.post( + f'{self.base_url}/query', + json={'query': repo_query}, + headers=headers, + timeout=30 + ) + resp.raise_for_status() + data = resp.json() + except Exception as e: + raise Exception(f"Sourcehut API request failed: {e}") + + repos = (data.get('data', {}).get('me', {}) + .get('repositories', {}).get('results', [])) + + total_repos = len(repos) + commit_timestamps = [] + languages = {} + + for repo in repos: + repo_name = repo.get('name', '') + log = repo.get('log', {}) + commits = log.get('results', []) + cursor = log.get('cursor') + + # Process first page of commits + self._process_sourcehut_commits( + commits, user_email, commit_timestamps, languages + ) + + # Paginate through remaining commit pages + while cursor: + page_query = f""" + query {{ + me {{ + repository(name: "{repo_name}") {{ + log(cursor: "{cursor}") {{ + cursor + results {{ + id + author {{ + email + time + }} + }} + }} + }} + }} + }} + """ + try: + resp = requests.post( + f'{self.base_url}/query', + json={'query': page_query}, + headers=headers, + timeout=30 + ) + resp.raise_for_status() + page_data = resp.json() + page_log = (page_data.get('data', {}).get('me', {}) + .get('repository', {}).get('log', {})) + page_commits = page_log.get('results', []) + cursor = page_log.get('cursor') + self._process_sourcehut_commits( + page_commits, user_email, commit_timestamps, languages + ) + except Exception: + break # Stop pagination on error + + # Build contribution graph from timestamps + contrib_graph = self._build_sourcehut_contribution_graph(commit_timestamps) + + # Calculate current streak + current_streak = self._calculate_sourcehut_streak(commit_timestamps) + return { - 'total_stars': 0, # Not available - 'total_forks': 0, # Not available - 'total_repos': 0, # Would need separate API call - 'languages': {}, # Not available - 'contribution_graph': [], # Not available + 'total_stars': 0, + 'total_forks': 0, + 'total_repos': total_repos, + 'languages': languages, + 'contribution_graph': contrib_graph, + 'current_streak': current_streak, 'pull_requests': {'open': 0, 'awaiting_review': 0, 'mentions': 0}, 'issues': {'assigned': 0, 'created': 0, 'mentions': 0}, } + + @staticmethod + def _process_sourcehut_commits(commits, user_email, commit_timestamps, languages): + """Process a list of Sourcehut commits, filtering by user email.""" + from datetime import datetime + for commit in commits: + author = commit.get('author', {}) + email = author.get('email', '') + time_str = author.get('time', '') + + if user_email and email != user_email: + continue + + if time_str: + try: + dt = datetime.fromisoformat(time_str.replace('Z', '+00:00')) + commit_timestamps.append(dt) + except (ValueError, TypeError): + pass + + @staticmethod + def _build_sourcehut_contribution_graph(commit_timestamps): + """Build a contribution graph (weeks of days) from commit timestamps.""" + from datetime import datetime, timedelta + from collections import Counter + + if not commit_timestamps: + return [] + + # Count commits per day + day_counts = Counter(dt.date().isoformat() for dt in commit_timestamps) + + # Get date range (last year) + end_date = datetime.now().date() + start_date = end_date - timedelta(days=365) + + # Build weeks + weeks = [] + current_date = start_date + while current_date <= end_date: + week = {'contributionDays': []} + for i in range(7): + day_date = current_date + timedelta(days=i) + if day_date > end_date: + break + count = day_counts.get(day_date.isoformat(), 0) + week['contributionDays'].append({ + 'contributionCount': count, + 'date': day_date.isoformat() + }) + if week['contributionDays']: + weeks.append(week) + current_date += timedelta(days=7) + + return weeks + + @staticmethod + def _calculate_sourcehut_streak(commit_timestamps): + """Calculate current consecutive-day contribution streak.""" + from datetime import datetime, timedelta + from collections import Counter + + if not commit_timestamps: + return 0 + + day_counts = Counter(dt.date() for dt in commit_timestamps) + + streak = 0 + current = datetime.now().date() + while current in day_counts and day_counts[current] > 0: + streak += 1 + current -= timedelta(days=1) + + # Check if today hasn't had commits yet — streak may still be active + # starting from yesterday + if streak == 0: + current = datetime.now().date() - timedelta(days=1) + while current in day_counts and day_counts[current] > 0: + streak += 1 + current -= timedelta(days=1) + + return streak