Skip to content

Commit 6fa9e88

Browse files
committed
Fundamental test coverage of userdb module reusing existing structure.
1 parent 8e7c396 commit 6fa9e88

File tree

1 file changed

+280
-0
lines changed

1 file changed

+280
-0
lines changed

tests/test_mig_shared_userdb.py

Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# --- BEGIN_HEADER ---
4+
#
5+
# test_mig_shared_userdb - unit tests for shared user database handling
6+
# Copyright (C) 2003-2025 The MiG Project by the Science HPC Center at UCPH
7+
#
8+
# This file is part of MiG.
9+
#
10+
# MiG is free software: you can redistribute it and/or modify
11+
# it under the terms of the GNU General Public License as published by
12+
# the Free Software Foundation; either version 2 of the License, or
13+
# (at your option) any later version.
14+
#
15+
# MiG is distributed in the hope that it will be useful,
16+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
17+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18+
# GNU General Public License for more details.
19+
#
20+
# You should have received a copy of the GNU General Public License
21+
# along with this program; if not, write to the Free Software
22+
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
23+
# USA.
24+
#
25+
# -- END_HEADER ---
26+
#
27+
28+
"""Unit tests for user database functionality"""
29+
30+
import os
31+
import time
32+
import unittest
33+
34+
from mig.shared.base import distinguished_name_to_user
35+
from mig.shared.fileio import delete_file
36+
from mig.shared.serial import loads
37+
from mig.shared.userdb import default_db_path, load_user_db, load_user_dict, \
38+
lock_user_db, save_user_db, save_user_dict, unlock_user_db, \
39+
update_user_dict
40+
from tests.support import MigTestCase, ensure_dirs_exist, testmain
41+
42+
TEST_USER_ID = '/C=DK/ST=NA/L=NA/O=Test Org/OU=NA/CN=Test User/emailAddress=test@example.com'
43+
THIS_USER_ID = '/C=DK/ST=NA/L=NA/O=Local Org/OU=NA/CN=This User/emailAddress=this.user@here.org'
44+
OTHER_USER_ID = '/C=DK/ST=NA/L=NA/O=Other Org/OU=NA/CN=Other User/emailAddress=other.user@there.org'
45+
46+
47+
class TestMigSharedUserDB(MigTestCase):
48+
"""Unit tests for user database functions in mig/shared/userdb.py"""
49+
50+
def _provide_configuration(self):
51+
"""Get test configuration"""
52+
return 'testconfig'
53+
54+
# Helper methods
55+
def _create_sample_db(self, content=None, db_path=None):
56+
"""Create sample user DB file with given content"""
57+
if db_path is None:
58+
db_path = self.user_db_path
59+
if content is None:
60+
sample_db = {
61+
TEST_USER_ID: distinguished_name_to_user(TEST_USER_ID),
62+
THIS_USER_ID: distinguished_name_to_user(THIS_USER_ID)
63+
}
64+
else:
65+
sample_db = content
66+
save_user_db(sample_db, db_path)
67+
return sample_db
68+
69+
def before_each(self):
70+
"""Set up test configuration and reset user DB paths"""
71+
ensure_dirs_exist(self.configuration.user_db_home)
72+
ensure_dirs_exist(self.configuration.mig_server_home)
73+
self.user_db_path = os.path.join(self.configuration.user_db_home,
74+
"MiG-users.db")
75+
self.legacy_db_path = os.path.join(self.configuration.mig_server_home,
76+
"MiG-users.db")
77+
78+
# Clear any existing test DBs
79+
if os.path.exists(self.user_db_path):
80+
delete_file(self.user_db_path, self.logger)
81+
if os.path.exists(self.legacy_db_path):
82+
delete_file(self.legacy_db_path, self.logger)
83+
84+
# Make empty test DBs
85+
self._create_sample_db(content={}, db_path=self.user_db_path)
86+
self._create_sample_db(content={}, db_path=self.legacy_db_path)
87+
88+
def test_default_db_path(self):
89+
"""Test default_db_path returns correct path structure"""
90+
expected = os.path.join(self.configuration.user_db_home,
91+
"MiG-users.db")
92+
result = default_db_path(self.configuration)
93+
self.assertEqual(result, expected)
94+
95+
# Test legacy path fallback
96+
self.configuration.user_db_home = '/no-such-dir'
97+
expected_legacy = os.path.join(self.configuration.mig_server_home,
98+
"MiG-users.db")
99+
result = default_db_path(self.configuration)
100+
self.assertEqual(result, expected_legacy)
101+
102+
def test_lock_unlock_user_db(self):
103+
"""Test lock/unlock cycle for user database"""
104+
# Exclusive locking
105+
flock = lock_user_db(self.user_db_path, exclusive=True)
106+
self.assertTrue(flock is not None)
107+
self.assertTrue(flock.readable)
108+
self.assertTrue(flock.writable)
109+
110+
# Unlock exclusive
111+
unlock_user_db(flock)
112+
113+
# Shared locking
114+
flock = lock_user_db(self.user_db_path, exclusive=False)
115+
self.assertTrue(flock is not None)
116+
self.assertTrue(flock.readable)
117+
# TODO: expose this attribue in the backend and enable next
118+
# self.assertFalse(flock.writable)
119+
120+
# Unlock shared
121+
unlock_user_db(flock)
122+
123+
def test_load_user_db(self):
124+
"""Test loading valid and empty user databases"""
125+
# Empty DB creation
126+
empty_db = {}
127+
save_user_db(empty_db, self.user_db_path)
128+
try:
129+
loaded = load_user_db(self.user_db_path)
130+
except Exception as exc:
131+
loaded = None
132+
self.assertEqual(loaded, empty_db)
133+
134+
# Verify proper loading
135+
sample_db = self._create_sample_db()
136+
try:
137+
loaded = load_user_db(self.user_db_path)
138+
except Exception as exc:
139+
loaded = None
140+
self.assertEqual(loaded, sample_db)
141+
142+
# Test missing DB load
143+
db_path = os.path.join(
144+
self.configuration.user_db_home, "no-such-db.db")
145+
try:
146+
loaded = load_user_db(db_path)
147+
except Exception as exc:
148+
loaded = None
149+
self.assertEqual(loaded, None)
150+
151+
def test_save_user_db(self):
152+
"""Test saving user database content"""
153+
sample_db = self._create_sample_db()
154+
try:
155+
loaded = load_user_db(self.user_db_path)
156+
except Exception as exc:
157+
loaded = None
158+
self.assertEqual(sample_db, loaded)
159+
160+
# Update DB
161+
sample_db["user3"] = {"field": "value3"}
162+
save_user_db(sample_db, self.user_db_path)
163+
try:
164+
reloaded = load_user_db(self.user_db_path)
165+
except Exception as exc:
166+
reloaded = None
167+
self.assertEqual(reloaded, sample_db)
168+
169+
def test_load_user_dict_missing(self):
170+
"""Test loading non-existent user from DB"""
171+
self._create_sample_db()
172+
try:
173+
loaded = load_user_dict(self.logger, "no-such-user",
174+
self.user_db_path)
175+
except Exception as exc:
176+
loaded = None
177+
self.assertIsNone(loaded)
178+
179+
def test_load_user_dict_existing(self):
180+
"""Test loading existing user from DB"""
181+
sample_db = self._create_sample_db()
182+
try:
183+
test_user_data = load_user_dict(self.logger, TEST_USER_ID,
184+
self.user_db_path)
185+
except Exception as exc:
186+
test_user_data = None
187+
self.assertEqual(test_user_data, sample_db[TEST_USER_ID])
188+
189+
def test_save_user_dict_new_user(self):
190+
"""Test saving new user to database"""
191+
other_user = distinguished_name_to_user(OTHER_USER_ID)
192+
save_status = save_user_dict(self.logger, OTHER_USER_ID,
193+
other_user, self.user_db_path)
194+
self.assertTrue(save_status)
195+
196+
try:
197+
loaded = load_user_db(self.user_db_path)
198+
except Exception as exc:
199+
loaded = None
200+
self.assertEqual(loaded[OTHER_USER_ID], other_user)
201+
202+
def test_save_user_dict_update(self):
203+
"""Test updating existing user in database"""
204+
sample_db = self._create_sample_db()
205+
changed = distinguished_name_to_user(THIS_USER_ID)
206+
changed.update({"Organization": "UPDATED", "new_field": "ADDED"})
207+
save_status = save_user_dict(self.logger, THIS_USER_ID,
208+
changed, self.user_db_path)
209+
self.assertTrue(save_status)
210+
211+
try:
212+
loaded = load_user_db(self.user_db_path)
213+
except Exception as exc:
214+
loaded = None
215+
self.assertEqual(loaded[THIS_USER_ID], changed)
216+
217+
def test_update_user_dict(self):
218+
"""Test update_user_dict with partial changes"""
219+
sample_db = self._create_sample_db()
220+
updated = update_user_dict(self.logger, THIS_USER_ID,
221+
{"Organization": "CHANGED"},
222+
self.user_db_path)
223+
self.assertEqual(updated["Organization"], "CHANGED")
224+
225+
try:
226+
full_db = load_user_db(self.user_db_path)
227+
except Exception as exc:
228+
full_db = None
229+
self.assertEqual(full_db[THIS_USER_ID]["Organization"], "CHANGED")
230+
231+
def test_update_user_dict_requirements(self):
232+
"""Test update_user_dict with invalid user ID"""
233+
self.logger.forgive_errors()
234+
try:
235+
result = update_user_dict(self.logger, "no-such-user",
236+
{"field": "test"}, self.user_db_path)
237+
except Exception as exc:
238+
result = None
239+
self.assertIsNone(result)
240+
241+
# TODO: adjust API to allow enabling the next test
242+
@unittest.skipIf(True, "requires locking fix")
243+
def test_concurrent_load_save(self):
244+
"""Test concurrent access protection through locking"""
245+
# First thread acquires exclusive lock
246+
flock1 = lock_user_db(self.user_db_path)
247+
self.assertIsNotNone(flock1)
248+
249+
# Second thread trying to lock should block
250+
self._create_sample_db()
251+
252+
def delayed_load():
253+
try:
254+
loaded = load_user_db(self.user_db_path)
255+
except Exception as exc:
256+
loaded = None
257+
return loaded
258+
259+
import threading
260+
delayed_thread = threading.Thread(target=delayed_load)
261+
delayed_thread.start()
262+
time.sleep(0.2)
263+
self.assertTrue(delayed_thread.is_alive())
264+
265+
# Release first lock and verify second completes
266+
unlock_user_db(flock1)
267+
delayed_thread.join(1.0)
268+
self.assertFalse(delayed_thread.is_alive())
269+
270+
def test_pickle_roundtrip(self):
271+
"""Verify pickle serialization compatibility"""
272+
orig_db = self._create_sample_db()
273+
with open(self.user_db_path, "rb") as fh:
274+
pickled = fh.read()
275+
loaded = loads(pickled)
276+
self.assertEqual(orig_db, loaded)
277+
278+
279+
if __name__ == '__main__':
280+
testmain()

0 commit comments

Comments
 (0)