Skip to content

Commit 09d8ae8

Browse files
wip
1 parent 4bd91fb commit 09d8ae8

File tree

2 files changed

+82
-19
lines changed

2 files changed

+82
-19
lines changed

libudpard/udpard.c

Lines changed: 77 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -470,20 +470,34 @@ static bool header_deserialize(const udpard_bytes_mut_t dgram_payload,
470470
// ---------------------------------------------------------------------------------------------------------------------
471471

472472
/// This may be allocated in the NIC DMA region so we keep overheads tight.
473+
/// An alternative solution is to allocate a flex array of void* pointers, one per fragment, directly in tx_transfer_t,
474+
/// but it might create a bit more memory pressure on average.
473475
typedef struct tx_frame_t
474476
{
475477
struct tx_frame_t* next;
476478
byte_t data[];
477479
} tx_frame_t;
478480

479-
static size_t tx_frame_size(const size_t mtu) { return sizeof(tx_frame_t) + mtu + HEADER_SIZE_BYTES; }
481+
static size_t tx_frame_object_size(const size_t mtu) { return sizeof(tx_frame_t) + mtu + HEADER_SIZE_BYTES; }
480482

483+
typedef struct
484+
{
485+
uint64_t topic_hash;
486+
uint64_t transfer_id;
487+
} tx_transfer_key_t;
488+
489+
/// The transmission scheduler maintains several indexes for the transfers in the pipeline.
490+
/// All index operations are logarithmic in the number of scheduled transfers.
491+
/// The priority index only contains transfers that are ready for transmission (now>=ready_at).
492+
/// The readiness index contains only transfers that are postponed (now<ready_at).
493+
/// The deadline index contains all transfers, ordered by their deadlines, used for purging expired transfers.
494+
/// The transfer index contains all transfers, used for lookup by (topic_hash, transfer_id).
481495
typedef struct tx_transfer_t
482496
{
483-
/// Various indexes this transfer is a member of.
484-
udpard_tree_t index_schedule; ///< Transmission order: next to transmit on the left.
485-
udpard_tree_t index_deadline; ///< Soonest to expire on the left.
486-
udpard_tree_t index_id; ///< Ordered by the topic hash and the transfer-ID.
497+
udpard_tree_t index_priority; ///< Next to transmit on the left. Key: (ready, priority); ready is implicit.
498+
udpard_tree_t index_readiness; ///< Soonest to be ready on the left. Key: ready_at
499+
udpard_tree_t index_deadline; ///< Soonest to expire on the left. Key: deadline
500+
udpard_tree_t index_transfer; ///< Specific transfer lookup for ack management. Key: tx_transfer_key_t
487501

488502
/// We always keep a pointer to the head, plus a cursor that scans the frames during transmission.
489503
/// Both are NULL if the payload is destroyed.
@@ -494,7 +508,7 @@ typedef struct tx_transfer_t
494508

495509
/// Retransmission state.
496510
uint_fast8_t retries;
497-
udpard_us_t next_attempt_at;
511+
udpard_us_t ready_at;
498512

499513
/// All frames except for the last one share the same MTU, so there's no use keeping dedicated size per frame.
500514
size_t mtu;
@@ -516,16 +530,29 @@ static bool tx_validate_mem_resources(const udpard_tx_mem_resources_t memory)
516530
(memory.payload.alloc != NULL) && (memory.payload.free != NULL);
517531
}
518532

519-
/// Frames with identical weight are processed in the FIFO order.
520-
static int32_t tx_cavl_compare_prio(const void* const user, const udpard_tree_t* const node)
533+
static int32_t tx_cavl_compare_priority(const void* const user, const udpard_tree_t* const node)
521534
{
522-
return (((int)*(const udpard_prio_t*)user) >= (int)CAVL2_TO_OWNER(node, tx_frame_t, index_order)->priority) ? +1
523-
: -1;
535+
const udpard_prio_t key = *(const udpard_prio_t*)user;
536+
const tx_transfer_t* const tr = CAVL2_TO_OWNER(node, tx_transfer_t, index_priority);
537+
return (key >= tr->priority) ? +1 : -1; // higher prio is numerically less
538+
}
539+
static int32_t tx_cavl_compare_readiness(const void* const user, const udpard_tree_t* const node)
540+
{
541+
return ((*(const udpard_us_t*)user) >= CAVL2_TO_OWNER(node, tx_transfer_t, index_readiness)->ready_at) ? +1 : -1;
524542
}
525-
526543
static int32_t tx_cavl_compare_deadline(const void* const user, const udpard_tree_t* const node)
527544
{
528-
return ((*(const udpard_us_t*)user) >= CAVL2_TO_OWNER(node, tx_frame_t, index_deadline)->deadline) ? +1 : -1;
545+
return ((*(const udpard_us_t*)user) >= CAVL2_TO_OWNER(node, tx_transfer_t, index_deadline)->deadline) ? +1 : -1;
546+
}
547+
static int32_t tx_cavl_compare_transfer(const void* const user, const udpard_tree_t* const node)
548+
{
549+
const tx_transfer_key_t* const key = (const tx_transfer_key_t*)user;
550+
const tx_transfer_t* const tr = CAVL2_TO_OWNER(node, tx_transfer_t, index_transfer); // clang-format off
551+
if (key->topic_hash < tr->topic_hash) { return -1; }
552+
if (key->topic_hash > tr->topic_hash) { return +1; }
553+
if (key->transfer_id < tr->transfer_id) { return -1; }
554+
if (key->transfer_id > tr->transfer_id) { return +1; }
555+
return 0; // clang-format on
529556
}
530557

531558
/// Returns the head of the transfer chain; NULL on OOM.
@@ -559,7 +586,7 @@ static tx_frame_t* tx_spool(const udpard_tx_mem_resources_t memory,
559586
if (NULL == tail) {
560587
while (head != NULL) {
561588
tx_frame_t* const next = head->next;
562-
mem_free(memory.payload, tx_frame_size((head == tail) ? progress : mtu), head);
589+
mem_free(memory.payload, tx_frame_object_size((head == tail) ? progress : mtu), head);
563590
head = next;
564591
}
565592
break;
@@ -587,7 +614,42 @@ static udpard_us_t tx_ack_timeout(const udpard_us_t baseline, const udpard_prio_
587614
return baseline * (1L << smaller((size_t)prio + retries, UDPARD_TX_RETRY_MAX)); // NOLINT(*-signed-bitwise)
588615
}
589616

617+
static void tx_insert(udpard_tx_t* const tx, tx_transfer_t* const tr, const udpard_us_t now)
618+
{
619+
const bool ready = now >= tr->ready_at;
620+
if (ready) {
621+
if (cavl2_is_inserted(tx->index_readiness, &tr->index_readiness)) {
622+
cavl2_remove(&tx->index_readiness, &tr->index_readiness);
623+
}
624+
if (!cavl2_is_inserted(tx->index_priority, &tr->index_priority)) {
625+
(void)cavl2_find_or_insert(
626+
&tx->index_priority, &tr->priority, tx_cavl_compare_priority, &tr->index_priority, cavl2_trivial_factory);
627+
}
628+
} else {
629+
if (cavl2_is_inserted(tx->index_priority, &tr->index_priority)) {
630+
cavl2_remove(&tx->index_priority, &tr->index_priority);
631+
}
632+
if (!cavl2_is_inserted(tx->index_readiness, &tr->index_readiness)) {
633+
(void)cavl2_find_or_insert(&tx->index_readiness,
634+
&tr->ready_at,
635+
tx_cavl_compare_readiness,
636+
&tr->index_readiness,
637+
cavl2_trivial_factory);
638+
}
639+
}
640+
if (!cavl2_is_inserted(tx->index_transfer, &tr->index_transfer)) {
641+
const tx_transfer_key_t key = { .topic_hash = tr->topic_hash, .transfer_id = tr->transfer_id };
642+
(void)cavl2_find_or_insert(
643+
&tx->index_transfer, &key, tx_cavl_compare_transfer, &tr->index_transfer, cavl2_trivial_factory);
644+
}
645+
if (!cavl2_is_inserted(tx->index_deadline, &tr->index_deadline)) {
646+
(void)cavl2_find_or_insert(
647+
&tx->index_deadline, &tr->deadline, tx_cavl_compare_deadline, &tr->index_deadline, cavl2_trivial_factory);
648+
}
649+
}
650+
590651
static uint32_t tx_push(udpard_tx_t* const tx,
652+
const udpard_us_t now,
591653
const udpard_us_t deadline,
592654
const meta_t meta,
593655
const udpard_udpip_ep_t endpoint,
@@ -607,7 +669,7 @@ static uint32_t tx_push(udpard_tx_t* const tx,
607669
if (tr != NULL) {
608670
mem_zero(sizeof(*tr), tr);
609671
tr->retries = 0;
610-
tr->next_attempt_at = BIG_BANG; // TODO: we can implement time-triggered comms here.
672+
tr->ready_at = BIG_BANG; // We can implement time-triggered comms here.
611673
tr->mtu = mtu;
612674
tr->mtu_last = mtu_last;
613675
tr->topic_hash = meta.topic_hash;
@@ -619,8 +681,7 @@ static uint32_t tx_push(udpard_tx_t* const tx,
619681
tr->user_transfer_reference = user_transfer_reference;
620682
tr->head = tr->cursor = tx_spool(tx->memory, mtu, meta, payload);
621683
if (tr->head != NULL) {
622-
// TODO: insert
623-
// Finalize
684+
tx_insert(tx, tr, now);
624685
tx->queue_size += n_frames;
625686
UDPARD_ASSERT(tx->queue_size <= tx->queue_capacity);
626687
out = (uint32_t)n_frames;

libudpard/udpard.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -418,9 +418,11 @@ struct udpard_tx_t
418418
uint64_t errors_capacity; ///< A transfer could not be enqueued due to queue capacity limit.
419419
uint64_t errors_expiration; ///< A frame had to be dropped due to premature deadline expiration.
420420

421-
/// Internal use only, do not modify!
422-
udpard_tree_t* index_order; ///< Most urgent on the left, then according to the insertion order.
423-
udpard_tree_t* index_deadline; ///< Soonest on the left, then according to the insertion order.
421+
/// Internal use only, do not modify! See tx_transfer_t for details.
422+
udpard_tree_t* index_priority;
423+
udpard_tree_t* index_readiness;
424+
udpard_tree_t* index_deadline;
425+
udpard_tree_t* index_transfer;
424426

425427
/// Opaque pointer for the application use only. Not accessed by the library.
426428
void* user;

0 commit comments

Comments
 (0)