From b18f5e8eb17eef9bfda95b38fab4623d7c3f6eba Mon Sep 17 00:00:00 2001 From: Rice Shelley Date: Wed, 3 Dec 2025 16:36:47 -0500 Subject: [PATCH 1/4] Updated test structure and bumped cocotb versions --- .../la_asyncfifo/testbench/la_asyncfifo.py | 155 ------------- .../la_asyncfifo/testbench/tb_la_asyncfifo.py | 187 --------------- lambdalib/utils/_tb_common.py | 76 ------ pyproject.toml | 7 +- tests/cocotb_common.py | 149 ++++++++++++ .../la_asyncfifo/la_asyncfifo_rd_monitor.py | 27 +++ .../la_asyncfifo/la_asyncfifo_wr_driver.py | 75 ++++++ .../ramlib/la_asyncfifo/test_la_asyncfifo.py | 218 ++++++++++++++++++ 8 files changed, 474 insertions(+), 420 deletions(-) delete mode 100644 lambdalib/ramlib/la_asyncfifo/testbench/la_asyncfifo.py delete mode 100644 lambdalib/ramlib/la_asyncfifo/testbench/tb_la_asyncfifo.py delete mode 100644 lambdalib/utils/_tb_common.py create mode 100644 tests/cocotb_common.py create mode 100644 tests/ramlib/la_asyncfifo/la_asyncfifo_rd_monitor.py create mode 100644 tests/ramlib/la_asyncfifo/la_asyncfifo_wr_driver.py create mode 100644 tests/ramlib/la_asyncfifo/test_la_asyncfifo.py diff --git a/lambdalib/ramlib/la_asyncfifo/testbench/la_asyncfifo.py b/lambdalib/ramlib/la_asyncfifo/testbench/la_asyncfifo.py deleted file mode 100644 index 0a42a2ad..00000000 --- a/lambdalib/ramlib/la_asyncfifo/testbench/la_asyncfifo.py +++ /dev/null @@ -1,155 +0,0 @@ -import cocotb -from cocotb.triggers import RisingEdge -from cocotb_bus.bus import Bus -from cocotb.queue import Queue -from cocotb.binary import BinaryValue - - -class LaAsyncFifoWrBus(Bus): - """Cocotb bus for lambdalib async FIFO WR interface""" - - _signals = ["wr_din", "wr_en", "wr_full"] - _optional_signals = ["wr_almost_full", "wr_chaosmode"] - - def __init__(self, entity=None, prefix=None, **kwargs): - super().__init__( - entity, - prefix, - self._signals, - optional_signals=self._optional_signals, - **kwargs - ) - - @classmethod - def from_entity(cls, entity, **kwargs): - return cls(entity, **kwargs) - - @classmethod - def from_prefix(cls, entity, prefix, **kwargs): - return cls(entity, prefix, **kwargs) - - -class LaAsyncFifoRdBus(Bus): - """Cocotb bus for lambdalib async FIFO RD interface""" - - _signals = ["rd_dout", "rd_en", "rd_empty"] - _optional_signals = [] - - def __init__(self, entity=None, prefix=None, **kwargs): - super().__init__( - entity, - prefix, - self._signals, - optional_signals=self._optional_signals, - **kwargs - ) - - @classmethod - def from_entity(cls, entity, **kwargs): - return cls(entity, **kwargs) - - @classmethod - def from_prefix(cls, entity, prefix, **kwargs): - return cls(entity, prefix, **kwargs) - - -class LaAsyncFifoSource: - """Driver for write side of lambdalib async fifo""" - - def __init__(self, bus: LaAsyncFifoWrBus, clock, reset=None): - self.bus = bus - self.clock = clock - self.reset = reset - - self.queue = Queue() - self.width = len(self.bus.wr_din) - - self._run_cr = cocotb.start_soon(self._run()) - - self.bus.wr_chaosmode.value = 0 - self.bus.wr_en.value = 0 - - self.wr_en_generator = None - - def set_wr_en_generator(self, generator): - self.wr_en_generator = generator - - async def send(self, data): - await self.queue.put(data) - - async def wait_until_idle(self): - while not self.queue.empty() or self.bus.wr_en.value == 1: - await RisingEdge(self.clock) - - async def _run(self): - clock_edge_event = RisingEdge(self.clock) - - bus_val = None - - while True: - await clock_edge_event - - fifo_full = self.bus.wr_full.value - wr_en = self.bus.wr_en.value - - if bus_val is None: - bus_val = await self.queue.get() - elif wr_en and not fifo_full: - bus_val = None if self.queue.empty() else self.queue.get_nowait() - - if bus_val is None: - self.bus.wr_en.value = 0 - else: - self.bus.wr_en.value = next(self.wr_en_generator) if self.wr_en_generator else 1 - - self.bus.wr_din.value = bus_val if bus_val else 0 - - -class LaAsyncFifoSink: - """Driver for read side of lambdalib async fifo""" - - def __init__(self, bus: LaAsyncFifoRdBus, clock, reset=None): - self.bus = bus - self.clock = clock - self.reset = reset - - self.queue = Queue() - self.width = len(self.bus.rd_dout) - - self._run_cr = cocotb.start_soon(self._run()) - - self.bus.rd_en.value = 0 - - self.rd_en_generator = None - self._pause = False - - def set_rd_en_generator(self, generator): - self.rd_en_generator = generator - - def pause(self): - self.bus.rd_en.value = 0 - self._pause = True - - def resume(self): - self._pause = False - - async def read(self) -> BinaryValue: - return await self.queue.get() - - async def _run(self): - clock_edge_event = RisingEdge(self.clock) - - while True: - await clock_edge_event - - fifo_empty = self.bus.rd_empty.value - - if self._pause: - self.bus.rd_en.value = 0 - elif self.rd_en_generator: - self.bus.rd_en.value = next(self.rd_en_generator) - else: - self.bus.rd_en.value = 1 - - if self.bus.rd_en.value and not fifo_empty: - self.queue.put_nowait(self.bus.rd_dout.value) diff --git a/lambdalib/ramlib/la_asyncfifo/testbench/tb_la_asyncfifo.py b/lambdalib/ramlib/la_asyncfifo/testbench/tb_la_asyncfifo.py deleted file mode 100644 index c704ee92..00000000 --- a/lambdalib/ramlib/la_asyncfifo/testbench/tb_la_asyncfifo.py +++ /dev/null @@ -1,187 +0,0 @@ -import random -from decimal import Decimal - -import siliconcompiler - -import cocotb -from cocotb.clock import Clock -from cocotb.triggers import ClockCycles, Timer, Combine -from cocotb.regression import TestFactory -from cocotb import utils - -from lambdalib import ramlib -from lambdalib.utils._tb_common import ( - run_cocotb, - drive_reset, - random_bool_generator -) -from lambdalib.ramlib.la_asyncfifo.testbench.la_asyncfifo import ( - LaAsyncFifoWrBus, - LaAsyncFifoRdBus, - LaAsyncFifoSource, - LaAsyncFifoSink -) - - -def bursty_en_gen(burst_len=20): - while True: - en_state = (random.randint(0, 1) == 1) - for _ in range(0, burst_len): - yield en_state - - -@cocotb.test() -async def test_almost_full(dut): - - wr_clk_period_ns = 10.0 - rd_clk_period_ns = 10.0 - - fifo_source = LaAsyncFifoSource( - bus=LaAsyncFifoWrBus.from_prefix(dut, ""), - clock=dut.wr_clk, - reset=dut.wr_nreset - ) - - fifo_sink = LaAsyncFifoSink( - bus=LaAsyncFifoRdBus.from_prefix(dut, ""), - clock=dut.rd_clk, - reset=dut.rd_nreset - ) - fifo_sink.pause() - - # Reset DUT - await Combine( - cocotb.start_soon(drive_reset(dut.wr_nreset)), - cocotb.start_soon(drive_reset(dut.rd_nreset)) - ) - - await cocotb.start(Clock(dut.wr_clk, wr_clk_period_ns, units="ns").start()) - # Randomize phase shift between clocks - await Timer(wr_clk_period_ns * random.random(), "ns", round_mode="round") - await cocotb.start(Clock(dut.rd_clk, rd_clk_period_ns, units="ns").start()) - - almost_full_level = int(dut.AFULLFINAL.value) - - await ClockCycles(dut.wr_clk, 3) - - # Almost full should be low before writing to FIFO - assert dut.wr_almost_full.value == 0 - - # Write to FIFO - for i in range(0, almost_full_level): - await fifo_source.send(i) - - # Wait for write to complete - await fifo_source.wait_until_idle() - assert dut.wr_almost_full.value == 1 - - # Read one value out of FIFO - fifo_sink.resume() - await fifo_sink.read() - fifo_sink.pause() - - # Check that almost full is lowered - await ClockCycles(dut.wr_clk, 4) - assert dut.wr_almost_full.value == 0 - - -async def fifo_rd_wr_test( - dut, - wr_clk_period_ns=10.0, - rd_clk_period_ns=10.0, - wr_en_generator=None, - rd_en_generator=None -): - - fifo_source = LaAsyncFifoSource( - bus=LaAsyncFifoWrBus.from_prefix(dut, ""), - clock=dut.wr_clk, - reset=dut.wr_nreset - ) - fifo_source.set_wr_en_generator(random_bool_generator()) - - fifo_sink = LaAsyncFifoSink( - bus=LaAsyncFifoRdBus.from_prefix(dut, ""), - clock=dut.rd_clk, - reset=dut.rd_nreset - ) - fifo_sink.set_rd_en_generator(random_bool_generator()) - - # Reset DUT - await Combine( - cocotb.start_soon(drive_reset(dut.wr_nreset)), - cocotb.start_soon(drive_reset(dut.rd_nreset)) - ) - - await cocotb.start(Clock(dut.wr_clk, wr_clk_period_ns, units="ns").start()) - # Randomize phase shift between clocks - await Timer(wr_clk_period_ns * random.random(), "ns", round_mode="round") - await cocotb.start(Clock(dut.rd_clk, rd_clk_period_ns, units="ns").start()) - - await ClockCycles(dut.wr_clk, 3) - - expected_len = 256 - expected = [random.getrandbits(len(dut.wr_din)) for _ in range(expected_len)] - - for val in expected: - await fifo_source.send(val) - - actual = [await fifo_sink.read() for _ in range(expected_len)] - - assert actual == expected - - await ClockCycles(dut.wr_clk, 10) - - -# Generate sets of tests based on the different permutations of the possible arguments to fifo_test -MAX_PERIOD_NS = 10.0 -MIN_PERIOD_NS = 1.0 -# Generate random clk period to test between min and max -RAND_WR_CLK_PERIOD_NS, RAND_RD_CLK_PERIOD_NS = [utils.get_time_from_sim_steps( - # Time step must be even for cocotb clock driver - steps=utils.get_sim_steps( - time=Decimal(MIN_PERIOD_NS) + ( - Decimal(MAX_PERIOD_NS - MIN_PERIOD_NS) - * Decimal(random.random()).quantize(Decimal("0.00")) - ), - units="ns", - round_mode="round" - ) & ~1, - units="ns" -) for _ in range(0, 2)] - -# Factory to automatically generate a set of tests based on the different permutations -# of the provided test arguments -tf = TestFactory(fifo_rd_wr_test) -tf.add_option('wr_clk_period_ns', [MIN_PERIOD_NS, RAND_WR_CLK_PERIOD_NS, MAX_PERIOD_NS]) -tf.add_option('rd_clk_period_ns', [MIN_PERIOD_NS, RAND_RD_CLK_PERIOD_NS, MAX_PERIOD_NS]) -tf.add_option('wr_en_generator', [None, random_bool_generator, bursty_en_gen]) -tf.add_option('rd_en_generator', [None, random_bool_generator, bursty_en_gen]) -tf.generate_tests() - - -def test_la_asyncfifo(): - chip = siliconcompiler.Chip("la_asyncfifo") - - # TODO: should not be needed? - chip.input("ramlib/la_asyncfifo/rtl/la_asyncfifo.v", package='lambdalib') - chip.use(ramlib) - - for depth in [2, 4, 8]: - test_module_name = "lambdalib.ramlib.tests.tb_la_asyncfifo" - test_name = f"{test_module_name}_depth_{depth}" - tests_failed = run_cocotb( - chip=chip, - test_module_name=test_module_name, - timescale=("1ns", "1ps"), - parameters={ - "DW": 32, - "DEPTH": depth - }, - output_dir_name=test_name - ) - assert (tests_failed == 0), f"Error test {test_name} failed!" - - -if __name__ == "__main__": - test_la_asyncfifo() diff --git a/lambdalib/utils/_tb_common.py b/lambdalib/utils/_tb_common.py deleted file mode 100644 index 62e5867b..00000000 --- a/lambdalib/utils/_tb_common.py +++ /dev/null @@ -1,76 +0,0 @@ -import os - -import random -from pathlib import Path - -from cocotb.triggers import Timer -from cocotb.runner import get_runner, get_results - -import siliconcompiler -from siliconcompiler.tools.slang import elaborate - - -def run_cocotb( - chip: siliconcompiler.Chip, - test_module_name, - output_dir_name=None, - simulator_name="icarus", - timescale=None, - parameters=None): - - # Use surelog to pickle Verilog sources - flow = "cocotb_flow" - chip.node(flow, "import", elaborate) - chip.set("option", "flow", flow) - assert chip.run() - - pickled_verilog = chip.find_result("v", "import") - assert pickled_verilog, "Could not locate pickled verilog" - - if output_dir_name is None: - output_dir_name = test_module_name - - top_level_dir = os.getcwd() - build_dir = Path(chip.getbuilddir()) / output_dir_name - test_dir = None - - results_xml = None - # Need to check if we are running inside of pytest. See link below. - # https://github.com/cocotb/cocotb/blob/d883ce914063c3601455d10a40f459fffa22d8f2/cocotb/runner.py#L313 - if not os.getenv("PYTEST_CURRENT_TEST", None): - results_xml = build_dir / "results.xml" - test_dir = top_level_dir - - # Build HDL in chosen simulator - runner = get_runner(simulator_name) - runner.build( - sources=[pickled_verilog], - hdl_toplevel=chip.design, - waves=True, - timescale=timescale, - build_dir=build_dir, - parameters=parameters - ) - # Run test - _, tests_failed = get_results(runner.test( - hdl_toplevel=chip.top(), - test_module=test_module_name, - test_dir=test_dir, - results_xml=results_xml, - build_dir=build_dir, - timescale=timescale, - waves=True - )) - - return tests_failed - - -async def drive_reset(reset, reset_time_in_ns=100, active_level=False): - reset.value = 1 if active_level else 0 - await Timer(reset_time_in_ns, units="ns") - reset.value = 0 if active_level else 1 - - -def random_bool_generator(): - while True: - yield (random.randint(0, 1) == 1) diff --git a/pyproject.toml b/pyproject.toml index 7c5188c5..8e6441ed 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,14 +32,17 @@ dev = [ "flake8 == 7.3.0", "pytest == 8.4.2", "pytest-timeout == 2.4.0", - "cocotb == 1.9.2", - "cocotb-bus == 0.2.1" + "cocotb == 2.0.1", + "cocotb-bus == 0.3.0" ] [tool.pytest.ini_options] markers = [ "eda: this test requires EDA tools installed to run.", ] +pythonpath = [ + "tests" +] testpaths = [ "tests" ] diff --git a/tests/cocotb_common.py b/tests/cocotb_common.py new file mode 100644 index 00000000..c3ce62b0 --- /dev/null +++ b/tests/cocotb_common.py @@ -0,0 +1,149 @@ +import os +import math +import random +import string +from decimal import Decimal + +from pathlib import Path +from typing import List, Tuple, Optional, Mapping, Union + +from siliconcompiler import Sim + +from cocotb.triggers import Timer +from cocotb.handle import SimHandleBase + +from cocotb_tools.runner import get_runner, VerilatorControlFile +from cocotb_tools.check_results import get_results + + +async def do_reset( + reset: SimHandleBase, + time_ns: int, + active_level: bool = False): + """Perform a async reset""" + reset.value = not active_level + await Timer(1, unit="step") + reset.value = active_level + await Timer(time_ns, "ns") + reset.value = not active_level + await Timer(1, unit="step") + + +def run_cocotb( + project: Sim, + test_module_name: str, + output_dir_name: Optional[str] = None, + simulator_name: str = "icarus", + build_args: Optional[List] = None, + timescale: Optional[Tuple[str, str]] = None, + parameters: Optional[Mapping[str, object]] = None, + seed: Optional[Union[str, int]] = None, + waves: bool = True): + """Launch cocotb given a SC Project""" + + if parameters is None: + parameters = {} + + if build_args is None: + build_args = [] + + if output_dir_name is None: + output_dir_name = test_module_name + + pytest_current_test = os.getenv("PYTEST_CURRENT_TEST", None) + + rootpath = Path(__file__).resolve().parent.parent + top_level_dir = rootpath + build_dir = rootpath / "build" / output_dir_name + test_dir = None + + results_xml = None + if not pytest_current_test: + results_xml = build_dir / "results.xml" + test_dir = top_level_dir + + # Get top level module name + top_lvl_module_name = None + main_filesets = project.get("option", "fileset") + if main_filesets and len(main_filesets) != 0: + main_fileset = main_filesets[0] + top_lvl_module_name = project.design.get_topmodule( + fileset=main_fileset + ) + + filesets = project.get_filesets() + idirs = [] + defines = [] + for lib, fileset in filesets: + idirs.extend(lib.find_files("fileset", fileset, "idir")) + defines.extend(lib.get("fileset", fileset, "define")) + + sources = [] + for lib, fileset in filesets: + for value in lib.get_file(fileset=fileset, filetype="systemverilog"): + sources.append(value) + for lib, fileset in filesets: + for value in lib.get_file(fileset=fileset, filetype="verilog"): + sources.append(value) + + vlt_files = [] + if simulator_name == "verilator": + for lib, fileset in filesets: + for value in lib.get_file(fileset=fileset, filetype="verilatorctrlfile"): + vlt_files.append(VerilatorControlFile(value)) + + # Build HDL in chosen simulator + runner = get_runner(simulator_name) + runner.build( + sources=vlt_files + sources, + includes=idirs, + hdl_toplevel=top_lvl_module_name, + build_args=build_args, + waves=waves, + timescale=timescale, + build_dir=build_dir, + parameters=parameters + ) + + # Run test + _, tests_failed = get_results(runner.test( + hdl_toplevel=top_lvl_module_name, + test_module=test_module_name, + test_dir=test_dir, + test_args=build_args, + results_xml=results_xml, + build_dir=build_dir, + seed=seed, + waves=waves + )) + + return tests_failed + + +def random_decimal(max: int, min: int, decimal_places=2) -> Decimal: + prefix = str(random.randint(min, max)) + suffix = ''.join(random.choice(string.digits) for _ in range(decimal_places)) + return Decimal(prefix + "." + suffix) + + +def random_toggle_generator(on_range=(0, 15), off_range=(0, 15)): + return bit_toggler_generator( + gen_on=(random.randint(*on_range) for _ in iter(int, 1)), + gen_off=(random.randint(*off_range) for _ in iter(int, 1)) + ) + + +def sine_wave_generator(amplitude, w, offset=0): + while True: + for idx in (i / float(w) for i in range(int(w))): + yield amplitude * math.sin(2 * math.pi * idx) + offset + + +def bit_toggler_generator(gen_on, gen_off): + for n_on, n_off in zip(gen_on, gen_off): + yield int(abs(n_on)), int(abs(n_off)) + + +def wave_generator(on_ampl=30, on_freq=200, off_ampl=10, off_freq=100): + return bit_toggler_generator(sine_wave_generator(on_ampl, on_freq), + sine_wave_generator(off_ampl, off_freq)) diff --git a/tests/ramlib/la_asyncfifo/la_asyncfifo_rd_monitor.py b/tests/ramlib/la_asyncfifo/la_asyncfifo_rd_monitor.py new file mode 100644 index 00000000..3cf9683b --- /dev/null +++ b/tests/ramlib/la_asyncfifo/la_asyncfifo_rd_monitor.py @@ -0,0 +1,27 @@ +from cocotb.triggers import RisingEdge +from cocotb_bus.monitors import BusMonitor + + +class LaAsyncFifoRdMonitor(BusMonitor): + """Monitor for read side of lambdalib async fifo""" + + _signals = [ + "rd_dout", + "rd_en", + "rd_empty" + ] + _optional_signals = [] + + def __init__(self, entity, name, clock, **kwargs): + BusMonitor.__init__(self, entity, name, clock, **kwargs) + + async def _monitor_recv(self): + clk_re = RisingEdge(self.clock) + + def valid_handshake(): + return bool(self.bus.rd_en.value) and not bool(self.bus.rd_empty.value) + + while True: + await clk_re + if valid_handshake(): + self._recv(self.bus.rd_dout.value.to_unsigned()) diff --git a/tests/ramlib/la_asyncfifo/la_asyncfifo_wr_driver.py b/tests/ramlib/la_asyncfifo/la_asyncfifo_wr_driver.py new file mode 100644 index 00000000..0f542406 --- /dev/null +++ b/tests/ramlib/la_asyncfifo/la_asyncfifo_wr_driver.py @@ -0,0 +1,75 @@ +from typing import Any + +from cocotb.triggers import RisingEdge +from cocotb.handle import SimHandleBase + +from cocotb_bus.drivers import ValidatedBusDriver + + +class LaAsyncFifoWrDriver(ValidatedBusDriver): + """Driver for write side of lambdalib async fifo""" + + _signals = [ + "wr_din", + "wr_en", + "wr_full" + ] + _optional_signals = [ + "wr_almost_full", + "wr_chaosmode" + ] + + def __init__( + self, + entity: SimHandleBase, + name: str, + clock: SimHandleBase, + *, + config={}, + **kwargs: Any + ): + ValidatedBusDriver.__init__(self, entity, name, clock, **kwargs) + + self.clock = clock + self.bus.wr_en.value = 0 + + async def _driver_send( + self, + transaction: int, + sync: bool = True + ) -> None: + """Implementation for BusDriver. + Args: + transaction: The transaction to send. + sync: Synchronize the transfer by waiting for a rising edge. + """ + + clk_re = RisingEdge(self.clock) + + if sync: + await clk_re + + # Insert a gap where valid is low + if not self.on: + self.bus.wr_en.value = 0 + for _ in range(self.off): + await clk_re + + # Grab the next set of on/off values + self._next_valids() + + # Consume a valid cycle + if self.on is not True and self.on: + self.on -= 1 + + def ready() -> bool: + return not bool(self.bus.wr_full.value) + + while True: + self.bus.wr_en.value = 1 + self.bus.wr_din.value = transaction + await clk_re + if ready(): + break + + self.bus.wr_en.value = 0 diff --git a/tests/ramlib/la_asyncfifo/test_la_asyncfifo.py b/tests/ramlib/la_asyncfifo/test_la_asyncfifo.py new file mode 100644 index 00000000..197e809e --- /dev/null +++ b/tests/ramlib/la_asyncfifo/test_la_asyncfifo.py @@ -0,0 +1,218 @@ +import random +from decimal import Decimal +import itertools + +import pytest + +import cocotb +from cocotb.clock import Clock +from cocotb.triggers import ClockCycles, Timer, Combine + +from cocotb_bus.scoreboard import Scoreboard +from cocotb_bus.drivers import BitDriver + +from cocotb_common import ( + run_cocotb, + do_reset, + random_decimal, + random_toggle_generator, + wave_generator +) + +from ramlib.la_asyncfifo.la_asyncfifo_wr_driver import LaAsyncFifoWrDriver +from ramlib.la_asyncfifo.la_asyncfifo_rd_monitor import LaAsyncFifoRdMonitor + + +@cocotb.test() +async def test_almost_full(dut): + + wr_clk_period_ns = 10 + rd_clk_period_ns = 10 + + #################################### + # Create BFMs + #################################### + fifo_source = LaAsyncFifoWrDriver( + entity=dut, + name="", + clock=dut.wr_clk + ) + + fifo_sink = LaAsyncFifoRdMonitor( + entity=dut, + name="", + clock=dut.rd_clk, + reset_n=dut.rd_nreset + ) + + dut.rd_en.value = 0 + + #################################### + # Handle clocking / resets + #################################### + + # Reset DUT + await Combine( + cocotb.start_soon(do_reset(dut.wr_nreset, time_ns=wr_clk_period_ns*5)), + cocotb.start_soon(do_reset(dut.rd_nreset, time_ns=rd_clk_period_ns*5)) + ) + + Clock(dut.wr_clk, wr_clk_period_ns, unit="ns").start() + + # Randomize phase shift between clocks + await Timer(wr_clk_period_ns * Decimal(random.random()), "ns", round_mode="round") + + Clock(dut.rd_clk, rd_clk_period_ns, unit="ns").start() + + almost_full_level = int(dut.AFULLFINAL.value) + + await ClockCycles(dut.wr_clk, 3) + + #################################### + # Test DUT + #################################### + + # Almost full should be low before writing to FIFO + assert bool(dut.wr_almost_full.value) is False + + # Write to FIFO + for i in range(0, almost_full_level): + await fifo_source.send(i) + + await ClockCycles(dut.wr_clk, 1) + + assert bool(dut.wr_almost_full.value) is True + + # Read one value out of FIFO + dut.rd_en.value = 1 + await fifo_sink.wait_for_recv() + dut.rd_en.value = 0 + + # Check that almost full is lowered + await ClockCycles(dut.wr_clk, 4) + assert bool(dut.wr_almost_full.value) is False + + +# Generate sets of tests based on the different permutations of the possible arguments to fifo_test +MAX_PERIOD_NS = 10 +MIN_PERIOD_NS = 1 +# Generate random clk periods to test between min and max +RAND_WR_CLK_PERIOD_NS = random_decimal(MAX_PERIOD_NS, MIN_PERIOD_NS) +RAND_RD_CLK_PERIOD_NS = random_decimal(MAX_PERIOD_NS, MIN_PERIOD_NS) + + +@cocotb.test() +@cocotb.parametrize( + wr_clk_period_ns=[MIN_PERIOD_NS, RAND_WR_CLK_PERIOD_NS, MAX_PERIOD_NS], + rd_clk_period_ns=[MIN_PERIOD_NS, RAND_RD_CLK_PERIOD_NS, MAX_PERIOD_NS], + wr_en_generator=[None, random_toggle_generator(), wave_generator()], + rd_en_generator=[None, random_toggle_generator(), wave_generator()] +) +async def fifo_rd_wr_test( + dut, + wr_clk_period_ns=10, + rd_clk_period_ns=10, + wr_en_generator=None, + rd_en_generator=None, + test_n_trans=256 +): + + #################################### + # Create BFMs + #################################### + fifo_source = LaAsyncFifoWrDriver( + entity=dut, + name="", + clock=dut.wr_clk, + valid_generator=wr_en_generator + ) + + fifo_sink = LaAsyncFifoRdMonitor( + entity=dut, + name="", + clock=dut.rd_clk, + reset_n=dut.rd_nreset + ) + + # Assign constant or bit driver to rd_en signal + if rd_en_generator is None: + dut.rd_en.value = 1 + else: + BitDriver(signal=dut.rd_en, clk=dut.rd_clk).start(generator=rd_en_generator) + + expected_output = [random.getrandbits(len(dut.wr_din)) for _ in range(test_n_trans)] + scoreboard = Scoreboard(dut, fail_immediately=True) + scoreboard.add_interface(monitor=fifo_sink, expected_output=expected_output) + + #################################### + # Handle clocking / resets + #################################### + + # Reset DUT + await Combine( + cocotb.start_soon(do_reset(dut.wr_nreset, time_ns=wr_clk_period_ns*5)), + cocotb.start_soon(do_reset(dut.rd_nreset, time_ns=rd_clk_period_ns*5)) + ) + + Clock(dut.wr_clk, wr_clk_period_ns, unit="ns").start() + + # Randomize phase shift between clocks + await Timer(wr_clk_period_ns * Decimal(random.random()), "ns", round_mode="round") + + Clock(dut.rd_clk, rd_clk_period_ns, unit="ns").start() + + await ClockCycles(dut.wr_clk, 3) + + #################################### + # Test DUT + #################################### + + # Drive test data into DUT + for trans in expected_output: + fifo_source.append(trans) + + # Wait for scoreboard to consume all expected outputs + while len(expected_output) != 0: + await ClockCycles(dut.rd_clk, 1) + + await ClockCycles(dut.rd_clk, 10) + + # Verify scoreboard results + raise scoreboard.result + + +@pytest.mark.eda +@pytest.mark.parametrize("depth, simulator, output_wave", list(itertools.product( + [2, 4, 8], + ["icarus", "verilator"], + [False] +))) +def test_la_asyncfifo( + depth: int, + simulator: str, + output_wave: bool +): + + from siliconcompiler import Sim + from lambdalib.ramlib import Asyncfifo + + test_inst_name = f"depth_{depth}_sim_{simulator}_output_wave{output_wave}" + + project = Sim(Asyncfifo()) + project.add_fileset("rtl") + + test_module_name = __name__ + test_name = f"{test_module_name}_{test_inst_name}" + tests_failed = run_cocotb( + project=project, + test_module_name=test_module_name, + simulator_name=simulator, + timescale=("1ns", "1ps"), + parameters={ + "DW": 32, + "DEPTH": depth + }, + output_dir_name=test_name, + waves=output_wave + ) + assert (tests_failed == 0), f"Error test {test_name} failed!" From 1acb4840d870648d971532e3b4441300aa874ca3 Mon Sep 17 00:00:00 2001 From: Rice Shelley Date: Wed, 3 Dec 2025 16:42:15 -0500 Subject: [PATCH 2/4] enabled eda_ci --- .github/workflows/ci.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 855bdf52..fa0c2602 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -78,7 +78,6 @@ jobs: eda_ci: name: Run EDA - if: false runs-on: ubuntu-latest container: ghcr.io/siliconcompiler/sc_runner:latest steps: From defb180218bc7957ed06160b493da22206b5412a Mon Sep 17 00:00:00 2001 From: Rice Shelley Date: Wed, 3 Dec 2025 16:47:25 -0500 Subject: [PATCH 3/4] Added pytest-xdist --- .github/workflows/ci.yml | 4 ++-- pyproject.toml | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fa0c2602..9e339de3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -89,10 +89,10 @@ jobs: . .venv/bin/activate pip install --upgrade pip - pip install .[dev] + pip install -e .[dev] # change running directory mkdir testrun cd testrun - pytest $GITHUB_WORKSPACE -m "eda" + pytest $GITHUB_WORKSPACE -n logical -m "eda" --verbose diff --git a/pyproject.toml b/pyproject.toml index 8e6441ed..77a01eeb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ version = {attr = "lambdalib.__version__"} dev = [ "flake8 == 7.3.0", "pytest == 8.4.2", + "pytest-xdist==3.8.0", "pytest-timeout == 2.4.0", "cocotb == 2.0.1", "cocotb-bus == 0.3.0" From cc71a019d3268810b46f2300193cd3f09c59bcd8 Mon Sep 17 00:00:00 2001 From: Rice Shelley Date: Wed, 3 Dec 2025 16:52:55 -0500 Subject: [PATCH 4/4] Increased pytest timeout --- tests/ramlib/la_asyncfifo/test_la_asyncfifo.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/ramlib/la_asyncfifo/test_la_asyncfifo.py b/tests/ramlib/la_asyncfifo/test_la_asyncfifo.py index 197e809e..6297b378 100644 --- a/tests/ramlib/la_asyncfifo/test_la_asyncfifo.py +++ b/tests/ramlib/la_asyncfifo/test_la_asyncfifo.py @@ -182,6 +182,7 @@ async def fifo_rd_wr_test( @pytest.mark.eda +@pytest.mark.timeout(120) @pytest.mark.parametrize("depth, simulator, output_wave", list(itertools.product( [2, 4, 8], ["icarus", "verilator"],