Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1f6fb47
feat: add certificate management v2 API endpoints
wgu-jesse-stewart Apr 21, 2026
d2de81c
fix: linting
wgu-jesse-stewart Apr 21, 2026
4a355f5
fix: linting
wgu-jesse-stewart Apr 21, 2026
ea687fe
fix: linting
wgu-jesse-stewart Apr 21, 2026
5f7d3df
fix: linting
wgu-jesse-stewart Apr 21, 2026
5d08560
feat: PR feedback
wgu-jesse-stewart Apr 22, 2026
a394008
fix: Removed the unused invalidated_user_ids
wgu-jesse-stewart Apr 22, 2026
0d8809e
feat: update tests
wgu-jesse-stewart Apr 22, 2026
33364c0
feat: update tests
wgu-jesse-stewart Apr 22, 2026
2d2c62a
feat: update tests
wgu-jesse-stewart Apr 22, 2026
6b9a2e9
feat: update tests
wgu-jesse-stewart Apr 22, 2026
66e1243
feat: PR feedback
wgu-jesse-stewart Apr 22, 2026
1b43fc5
fix: tests
wgu-jesse-stewart Apr 22, 2026
d8ff883
feat: PR feedback
wgu-jesse-stewart Apr 23, 2026
684a245
fix: build
wgu-jesse-stewart Apr 23, 2026
2902be9
fix: tests
wgu-jesse-stewart Apr 23, 2026
daf8ce0
feat: wrap create_certificate_invalidation_entry in atomic
wgu-jesse-stewart Apr 23, 2026
397b8f8
Merge branch 'master' into wgu-jesse-stewart/instructor_dashboard_cer…
wgu-jesse-stewart Apr 23, 2026
9e74963
feat: add logging and max_length
wgu-jesse-stewart Apr 24, 2026
0592918
Merge branch 'wgu-jesse-stewart/instructor_dashboard_certificates_v2'…
wgu-jesse-stewart Apr 24, 2026
a317b19
feat: show all exceptions granted records
wgu-jesse-stewart Apr 24, 2026
5736077
fix: tests
wgu-jesse-stewart Apr 24, 2026
a56b2c3
feat: PR feedback
wgu-jesse-stewart Apr 24, 2026
7acb2a4
feat: adds bulk grant exception
wgu-jesse-stewart Apr 28, 2026
df144dd
Merge branch 'master' into wgu-jesse-stewart/instructor_dashboard_cer…
wgu-jesse-stewart Apr 28, 2026
3634c66
fix: linting
wgu-jesse-stewart Apr 28, 2026
472329e
fix: tests
wgu-jesse-stewart Apr 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lms/djangoapps/instructor/views/api_urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@
api_v2.CertificateExceptionsView.as_view(),
name='certificate_exceptions'
),
re_path(
rf'^courses/{COURSE_ID_PATTERN}/certificates/exceptions/bulk$',
api_v2.BulkCertificateExceptionsView.as_view(),
name='bulk_certificate_exceptions'
),
re_path(
rf'^courses/{COURSE_ID_PATTERN}/certificates/invalidations$',
api_v2.CertificateInvalidationsView.as_view(),
Expand Down
116 changes: 116 additions & 0 deletions lms/djangoapps/instructor/views/api_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -2117,6 +2117,122 @@ def _validate_certificates_for_invalidation(learner_to_user, course_key):
return certificates_to_invalidate, errors


class BulkCertificateExceptionsView(DeveloperErrorViewMixin, APIView):
"""
View to grant certificate exceptions via CSV upload.

**Example Requests**

POST /api/instructor/v2/courses/{course_id}/certificates/exceptions/bulk

**POST Request Body**

Form data with CSV file uploaded as 'file' field.
CSV format: username_or_email,notes (optional second column)

**Returns**

* 200: OK - Bulk exceptions processed with success/error details
* 400: Bad Request - Invalid CSV file or format
* 401: Unauthorized - User is not authenticated
* 403: Forbidden - User lacks instructor permissions
"""
permission_classes = (IsAuthenticated, permissions.InstructorPermission)
permission_name = permissions.CERTIFICATE_EXCEPTION_VIEW

def post(self, request, course_id):
"""Grant certificate exceptions via CSV upload."""
course_key = CourseKey.from_string(course_id)
# Validate that the course exists
get_course_by_id(course_key)

# Check if file was uploaded
if 'file' not in request.FILES:
return Response(
{'message': _('No file uploaded')},
status=status.HTTP_400_BAD_REQUEST
)

uploaded_file = request.FILES['file']

# Validate file type
if not uploaded_file.name.endswith('.csv'):
return Response(
{'message': _('File must be in CSV format')},
status=status.HTTP_400_BAD_REQUEST
)

results = {
'success': [],
'errors': []
}

try:
# Read and parse CSV file
file_content = uploaded_file.read().decode('utf-8-sig')
csv_reader = csv.reader(file_content.splitlines())

learners_with_notes = []
for _row_num, row in enumerate(csv_reader, start=1):
if not row or not row[0].strip():
continue # Skip empty rows

learner = row[0].strip()
notes = row[1].strip() if len(row) > 1 and row[1].strip() else ''

learners_with_notes.append((learner, notes))

if not learners_with_notes:
return Response(
{'message': _('CSV file is empty or contains no valid entries')},
status=status.HTTP_400_BAD_REQUEST
)

# Extract just the learners for resolution
learners = [learner for learner, _ in learners_with_notes]
Comment on lines +2191 to +2192
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: the next() scan on line 2207 does a linear search through learners_with_notes for every learner, making the creation loop O(n²). Build a dict here and index into it below.

Suggested change
# Extract just the learners for resolution
learners = [learner for learner, _ in learners_with_notes]
# Extract learners for resolution and build a notes lookup
learners = [learner for learner, _ in learners_with_notes]
notes_by_learner = dict(learners_with_notes)


# Resolve all usernames/emails to users upfront
learner_to_user, user_errors = _resolve_learners_to_users(learners)
results['errors'].extend(user_errors)

# Validate learners for certificate exceptions
exceptions_to_create, validation_errors = _validate_learners_for_certificate_exceptions(
learner_to_user, course_key
)
results['errors'].extend(validation_errors)

# Create all exceptions using the certificates API
for learner, user in exceptions_to_create:
# Find the notes for this learner
notes = next((n for l, n in learners_with_notes if l == learner), '')
Comment on lines +2206 to +2207
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the dict built above:

Suggested change
# Find the notes for this learner
notes = next((n for l, n in learners_with_notes if l == learner), '')
notes = notes_by_learner.get(learner, '')


try:
certs_api.create_or_update_certificate_allowlist_entry(user, course_key, notes)
log.info(
"Certificate exception granted for user %s (%s) in course %s by %s via CSV upload",
user.id, learner, course_key, request.user.username
)
results['success'].append(learner)
except Exception as exc: # pylint: disable=broad-except
log.exception(
"Error creating certificate exception for user %s in course %s",
user.id, course_key
)
results['errors'].append({
'learner': learner,
'message': str(exc)
})

return Response(results, status=status.HTTP_200_OK)

except Exception as exc: # pylint: disable=broad-except
log.exception("Error processing CSV file for certificate exceptions")
return Response(
{'message': _('Error processing CSV file: {error}').format(error=str(exc))},
status=status.HTTP_400_BAD_REQUEST
)


class CertificateInvalidationsView(DeveloperErrorViewMixin, APIView):
"""
View to invalidate or re-validate certificates.
Expand Down
50 changes: 34 additions & 16 deletions openedx/core/lib/tests/test_jwt.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,23 @@
test_user_id = 121
invalid_test_user_id = 120
test_timeout = 1000
test_now = int(time())
test_claims = {"foo": "bar", "baz": "quux", "meaning": 42}
expected_full_token = {
"lms_user_id": test_user_id,
"iat": test_now,
"exp": test_now + test_timeout,
"iss": "token-test-issuer", # these lines from test_settings.py
"version": "1.2.0", # these lines from test_settings.py
}


def get_test_now():
"""Get current time for test tokens."""
return int(time())


def get_expected_full_token(test_now):
"""Generate expected token with current timestamp."""
return {
"lms_user_id": test_user_id,
"iat": test_now,
"exp": test_now + test_timeout,
"iss": "token-test-issuer", # these lines from test_settings.py
"version": "1.2.0", # these lines from test_settings.py
}


@skip_unless_lms
Expand All @@ -30,25 +38,28 @@ class TestSign(unittest.TestCase):
"""

def test_create_jwt(self):
test_now = get_test_now()
token = create_jwt(test_user_id, test_timeout, {}, test_now)

decoded = unpack_and_verify(token)
self.assertEqual(expected_full_token, decoded) # noqa: PT009
self.assertEqual(get_expected_full_token(test_now), decoded) # noqa: PT009

def test_create_jwt_with_claims(self):
test_now = get_test_now()
token = create_jwt(test_user_id, test_timeout, test_claims, test_now)

expected_token_with_claims = expected_full_token.copy()
expected_token_with_claims = get_expected_full_token(test_now).copy()
expected_token_with_claims.update(test_claims)

decoded = unpack_and_verify(token)
self.assertEqual(expected_token_with_claims, decoded) # noqa: PT009

def test_malformed_token(self):
test_now = get_test_now()
token = create_jwt(test_user_id, test_timeout, test_claims, test_now)
token = token + "a"

expected_token_with_claims = expected_full_token.copy()
expected_token_with_claims = get_expected_full_token(test_now).copy()
expected_token_with_claims.update(test_claims)

with self.assertRaises(InvalidSignatureError): # noqa: PT027
Expand All @@ -62,53 +73,60 @@ class TestUnpack(unittest.TestCase):
"""

def test_unpack_jwt(self):
test_now = get_test_now()
token = create_jwt(test_user_id, test_timeout, {}, test_now)
decoded = unpack_jwt(token, test_user_id, test_now)

self.assertEqual(expected_full_token, decoded) # noqa: PT009
self.assertEqual(get_expected_full_token(test_now), decoded) # noqa: PT009

def test_unpack_jwt_with_claims(self):
test_now = get_test_now()
token = create_jwt(test_user_id, test_timeout, test_claims, test_now)

expected_token_with_claims = expected_full_token.copy()
expected_token_with_claims = get_expected_full_token(test_now).copy()
expected_token_with_claims.update(test_claims)

decoded = unpack_jwt(token, test_user_id, test_now)

self.assertEqual(expected_token_with_claims, decoded) # noqa: PT009

def test_malformed_token(self):
test_now = get_test_now()
token = create_jwt(test_user_id, test_timeout, test_claims, test_now)
token = token + "a"

expected_token_with_claims = expected_full_token.copy()
expected_token_with_claims = get_expected_full_token(test_now).copy()
expected_token_with_claims.update(test_claims)

with self.assertRaises(InvalidSignatureError): # noqa: PT027
unpack_jwt(token, test_user_id, test_now)

def test_unpack_token_with_invalid_user(self):
test_now = get_test_now()
token = create_jwt(invalid_test_user_id, test_timeout, {}, test_now)

with self.assertRaises(InvalidSignatureError): # noqa: PT027
unpack_jwt(token, test_user_id, test_now)

def test_unpack_expired_token(self):
test_now = get_test_now()
token = create_jwt(test_user_id, test_timeout, {}, test_now)

with self.assertRaises(ExpiredSignatureError): # noqa: PT027
unpack_jwt(token, test_user_id, test_now + test_timeout + 1)

def test_missing_expired_lms_user_id(self):
payload = expected_full_token.copy()
test_now = get_test_now()
payload = get_expected_full_token(test_now).copy()
del payload['lms_user_id']
token = _encode_and_sign(payload)

with self.assertRaises(MissingRequiredClaimError): # noqa: PT027
unpack_jwt(token, test_user_id, test_now)

def test_missing_expired_key(self):
payload = expected_full_token.copy()
test_now = get_test_now()
payload = get_expected_full_token(test_now).copy()
del payload['exp']
token = _encode_and_sign(payload)

Expand Down
Loading