Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 118 additions & 68 deletions tests/test_delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ async def test_delete_model_by_id_not_found(db: AsyncSession, crud_ins: CRUDPlus
async def test_delete_model_with_flush(db: AsyncSession, sample_ins: list[Ins], crud_ins: CRUDPlus[Ins]):
item = sample_ins[1]

async with db.begin():
count = await crud_ins.delete_model(db, item.id, flush=True)
count = await crud_ins.delete_model(db, item.id, flush=True)

assert count == 1

Expand All @@ -52,10 +51,15 @@ async def test_delete_model_with_commit(db: AsyncSession, sample_ins: list[Ins],
async def test_delete_model_by_column_basic(db: AsyncSession, sample_ins: list[Ins], crud_ins: CRUDPlus[Ins]):
item = sample_ins[3]

async with db.begin():
count = await crud_ins.delete_model_by_column(db, allow_multiple=True, name=item.name)
before_exists = await crud_ins.exists(db, name=item.name)
assert before_exists is True

assert count >= 0
count = await crud_ins.delete_model_by_column(db, allow_multiple=True, commit=True, name=item.name)

assert count > 0

after_exists = await crud_ins.exists(db, name=item.name)
assert after_exists is False


@pytest.mark.asyncio
Expand All @@ -68,29 +72,45 @@ async def test_delete_model_by_column_not_found(db: AsyncSession, crud_ins: CRUD

@pytest.mark.asyncio
async def test_delete_model_by_column_allow_multiple(db: AsyncSession, sample_ins: list[Ins], crud_ins: CRUDPlus[Ins]):
async with db.begin():
count = await crud_ins.delete_model_by_column(db, allow_multiple=True, is_deleted=False)
before_count = await crud_ins.count(db, is_deleted=False)
assert before_count > 0

assert count >= 0
count = await crud_ins.delete_model_by_column(db, allow_multiple=True, commit=True, is_deleted=False)

assert count == before_count

after_count = await crud_ins.count(db, is_deleted=False)
assert after_count == 0


@pytest.mark.asyncio
async def test_delete_model_by_column_with_flush(db: AsyncSession, sample_ins: list[Ins], crud_ins: CRUDPlus[Ins]):
item = sample_ins[4]

async with db.begin():
count = await crud_ins.delete_model_by_column(db, allow_multiple=True, flush=True, name=item.name)
before_exists = await crud_ins.exists(db, name=item.name)
assert before_exists is True

assert count >= 0
count = await crud_ins.delete_model_by_column(db, allow_multiple=True, commit=True, name=item.name)

assert count > 0

after_exists = await crud_ins.exists(db, name=item.name)
assert after_exists is False


@pytest.mark.asyncio
async def test_delete_model_by_column_with_commit(db: AsyncSession, sample_ins: list[Ins], crud_ins: CRUDPlus[Ins]):
item = sample_ins[5]

before_exists = await crud_ins.exists(db, name=item.name)
assert before_exists is True

count = await crud_ins.delete_model_by_column(db, allow_multiple=True, commit=True, name=item.name)

assert count >= 0
assert count > 0

after_exists = await crud_ins.exists(db, name=item.name)
assert after_exists is False


@pytest.mark.asyncio
Expand All @@ -104,7 +124,9 @@ async def test_delete_model_by_column_no_filters_error(db: AsyncSession, crud_in
async def test_delete_model_by_column_multiple_results_error(
db: AsyncSession, sample_ins: list[Ins], crud_ins: CRUDPlus[Ins]
):
with pytest.raises(Exception):
from sqlalchemy_crud_plus.errors import MultipleResultsError

with pytest.raises(MultipleResultsError):
async with db.begin():
await crud_ins.delete_model_by_column(db, is_deleted=False)

Expand All @@ -130,20 +152,22 @@ async def test_logical_delete_single_record(db: AsyncSession, sample_ins: list[I

@pytest.mark.asyncio
async def test_logical_delete_multiple_records(db: AsyncSession, sample_ins: list[Ins], crud_ins: CRUDPlus[Ins]):
async with db.begin():
count = await crud_ins.delete_model_by_column(
db,
logical_deletion=True,
deleted_flag_column='is_deleted',
allow_multiple=True,
is_deleted=False,
)
before_count = await crud_ins.count(db, is_deleted=False)
assert before_count > 0

assert count >= 0
count = await crud_ins.delete_model_by_column(
db,
logical_deletion=True,
deleted_flag_column='is_deleted',
allow_multiple=True,
commit=True,
is_deleted=False,
)

async with db.begin():
remaining_false = await crud_ins.select_models(db, is_deleted=False)
assert len(remaining_false) >= 0
assert count == before_count

remaining_false = await crud_ins.select_models(db, is_deleted=False)
assert len(remaining_false) == 0


@pytest.mark.asyncio
Expand All @@ -164,37 +188,36 @@ async def test_logical_delete_with_custom_column(db: AsyncSession, sample_ins: l

@pytest.mark.asyncio
async def test_logical_delete_with_filters(db: AsyncSession, sample_ins: list[Ins], crud_ins: CRUDPlus[Ins]):
async with db.begin():
count = await crud_ins.delete_model_by_column(
db,
logical_deletion=True,
deleted_flag_column='is_deleted',
allow_multiple=True,
name__like='item_%',
id__gt=3,
)
await crud_ins.count(db, name__like='item_%', id__gt=3, is_deleted=False)

assert count >= 0
count = await crud_ins.delete_model_by_column(
db,
logical_deletion=True,
deleted_flag_column='is_deleted',
allow_multiple=True,
commit=True,
name__like='item_%',
id__gt=3,
)

async with db.begin():
deleted_items = await crud_ins.select_models(db, name__like='item_%', id__gt=3, is_deleted=True)
assert count >= 0

assert len(deleted_items) >= 0
deleted_items = await crud_ins.select_models(db, name__like='item_%', id__gt=3, is_deleted=True)
assert len(deleted_items) >= count


@pytest.mark.asyncio
async def test_logical_delete_with_flush(db: AsyncSession, sample_ins: list[Ins], crud_ins: CRUDPlus[Ins]):
item = sample_ins[2]

async with db.begin():
count = await crud_ins.delete_model_by_column(
db,
logical_deletion=True,
deleted_flag_column='is_deleted',
allow_multiple=False,
flush=True,
id=item.id,
)
count = await crud_ins.delete_model_by_column(
db,
logical_deletion=True,
deleted_flag_column='is_deleted',
allow_multiple=False,
flush=True,
id=item.id,
)

assert count == 1

Expand Down Expand Up @@ -234,20 +257,23 @@ async def test_logical_delete_no_matching_records(db: AsyncSession, crud_ins: CR

@pytest.mark.asyncio
async def test_logical_delete_already_deleted_records(db: AsyncSession, sample_ins: list[Ins], crud_ins: CRUDPlus[Ins]):
async with db.begin():
await crud_ins.delete_model_by_column(
db, logical_deletion=True, deleted_flag_column='is_deleted', allow_multiple=True, id__le=3
)
first_count = await crud_ins.delete_model_by_column(
db, logical_deletion=True, deleted_flag_column='is_deleted', allow_multiple=True, commit=True, id__le=3
)
assert first_count >= 0

async with db.begin():
count = await crud_ins.delete_model_by_column(
db,
logical_deletion=True,
deleted_flag_column='is_deleted',
allow_multiple=True,
id__le=3,
is_deleted=True,
)
already_deleted_count = await crud_ins.count(db, id__le=3, is_deleted=True)
assert already_deleted_count >= first_count

count = await crud_ins.delete_model_by_column(
db,
logical_deletion=True,
deleted_flag_column='is_deleted',
allow_multiple=True,
commit=True,
id__le=3,
is_deleted=True,
)

assert count >= 0

Expand Down Expand Up @@ -295,23 +321,22 @@ async def test_logical_delete_single_but_multiple_found(
async def test_logical_delete_affects_count(db: AsyncSession, sample_ins: list[Ins], crud_ins: CRUDPlus[Ins]):
initial_count = await crud_ins.count(db, is_deleted=False)

to_delete_count = await crud_ins.count(db, id__le=2, is_deleted=False)

deleted_count = await crud_ins.delete_model_by_column(
db,
logical_deletion=True,
deleted_flag_column='is_deleted',
allow_multiple=True,
commit=True,
id__le=2,
is_deleted=False,
)

final_count = await crud_ins.count(db, is_deleted=False)

# 检查是否至少删除了一条记录
assert deleted_count >= 0

# 如果删除了记录,则最终计数应该小于或等于初始计数
if deleted_count > 0:
assert final_count <= initial_count
assert deleted_count == to_delete_count
assert final_count == initial_count - deleted_count


@pytest.mark.asyncio
Expand Down Expand Up @@ -359,7 +384,6 @@ async def test_logical_delete_with_select_models(db: AsyncSession, sample_ins: l

@pytest.mark.asyncio
async def test_delete_model_by_column_with_deleted_at(db: AsyncSession, sample_ins: list[Ins], crud_ins: CRUDPlus[Ins]):
# 使用数据库中存在的项目
item = sample_ins[6]

async with db.begin():
Expand All @@ -378,7 +402,6 @@ async def test_delete_model_by_column_with_deleted_at(db: AsyncSession, sample_i
async def test_delete_model_by_column_without_deleted_at_column(
db: AsyncSession, sample_ins: list[Ins], crud_ins: CRUDPlus[Ins]
):
# 使用数据库中存在的项目
item = sample_ins[7]

async with db.begin():
Expand All @@ -390,3 +413,30 @@ async def test_delete_model_by_column_without_deleted_at_column(
updated_item = await crud_ins.select_model(db, item.id)
assert updated_item is not None
assert updated_item.is_deleted is True


@pytest.mark.asyncio
async def test_delete_model_by_column_with_custom_deleted_at_column(
db: AsyncSession, sample_ins: list[Ins], crud_ins: CRUDPlus[Ins]
):
from datetime import datetime, timezone

item = sample_ins[8]

async with db.begin():
count = await crud_ins.delete_model_by_column(
db,
logical_deletion=True,
deleted_flag_column='is_deleted',
deleted_at_column='updated_time',
deleted_at_factory=datetime.now(timezone.utc),
id=item.id,
)

assert count == 1

async with db.begin():
updated_item = await crud_ins.select_model(db, item.id)
assert updated_item is not None
assert updated_item.is_deleted is True
assert updated_item.updated_time is not None
33 changes: 23 additions & 10 deletions tests/test_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,21 +113,23 @@ async def test_filter_like(db: AsyncSession, sample_ins: list[Ins], crud_ins: CR
async def test_filter_not_like(db: AsyncSession, sample_ins: list[Ins], crud_ins: CRUDPlus[Ins]):
results = await crud_ins.select_models(db, name__not_like='nonexistent_%')

assert len(results) >= 0
assert len(results) > 0
assert all('nonexistent_' not in r.name for r in results)


@pytest.mark.asyncio
async def test_filter_ilike(db: AsyncSession, sample_ins: list[Ins], crud_ins: CRUDPlus[Ins]):
results = await crud_ins.select_models(db, name__ilike='ITEM_%')

assert len(results) >= 0
assert len(results) > 0
assert all(r.name.lower().startswith('item_') for r in results)


@pytest.mark.asyncio
async def test_filter_not_ilike(db: AsyncSession, sample_ins: list[Ins], crud_ins: CRUDPlus[Ins]):
results = await crud_ins.select_models(db, name__not_ilike='ITEM_%')

assert len(results) >= 0
assert all(not r.name.lower().startswith('item_') for r in results)


@pytest.mark.asyncio
Expand Down Expand Up @@ -156,22 +158,22 @@ async def test_filter_match(db: AsyncSession, sample_ins: list[Ins], crud_ins: C
try:
results = await crud_ins.select_models(db, name__match='item')
assert len(results) >= 0
except Exception:
assert True
except Exception as e:
assert 'match' in str(e).lower() or 'not supported' in str(e).lower()


@pytest.mark.asyncio
async def test_filter_concat(db: AsyncSession, sample_ins: list[Ins], crud_ins: CRUDPlus[Ins]):
results = await crud_ins.select_models(db, name__concat='_test')

assert len(results) >= 0
assert isinstance(results, list)


@pytest.mark.asyncio
async def test_filter_add(db: AsyncSession, sample_ins: list[Ins], crud_ins: CRUDPlus[Ins]):
results = await crud_ins.select_models(db, id__add=1)

assert len(results) >= 0
assert isinstance(results, list)


@pytest.mark.asyncio
Expand Down Expand Up @@ -202,6 +204,13 @@ async def test_filter_mul(db: AsyncSession, sample_ins: list[Ins], crud_ins: CRU
assert len(results) >= 0


@pytest.mark.asyncio
async def test_filter_mul_with_condition(db: AsyncSession, sample_ins: list[Ins], crud_ins: CRUDPlus[Ins]):
results = await crud_ins.select_models(db, id__mul={'value': 2, 'condition': {'gt': 0}})

assert len(results) >= 0


@pytest.mark.asyncio
async def test_filter_rmul(db: AsyncSession, sample_ins: list[Ins], crud_ins: CRUDPlus[Ins]):
results = await crud_ins.select_models(db, id__rmul=3)
Expand Down Expand Up @@ -255,7 +264,9 @@ async def test_filter_rmod(db: AsyncSession, sample_ins: list[Ins], crud_ins: CR
async def test_filter_or_same_field_list_values(db: AsyncSession, sample_ins: list[Ins], crud_ins: CRUDPlus[Ins]):
results = await crud_ins.select_models(db, __or__={'is_deleted': [True, False]})

assert len(results) >= 0
total_count = await crud_ins.count(db)
assert len(results) == total_count
assert all(r.is_deleted in [True, False] for r in results)


@pytest.mark.asyncio
Expand All @@ -264,14 +275,16 @@ async def test_filter_or_different_fields_single_values(
):
results = await crud_ins.select_models(db, __or__={'is_deleted': True, 'id__gt': 5})

assert len(results) >= 0
assert len(results) > 0
assert all(r.is_deleted is True or r.id > 5 for r in results)


@pytest.mark.asyncio
async def test_filter_or_with_operators(db: AsyncSession, sample_ins: list[Ins], crud_ins: CRUDPlus[Ins]):
results = await crud_ins.select_models(db, __or__={'name__like': 'item_%', 'id__lt': 3})

assert len(results) >= 0
assert len(results) > 0
assert all('item_' in r.name or r.id < 3 for r in results)


@pytest.mark.asyncio
Expand Down
Loading