Skip to content

Commit 4bd91fb

Browse files
wip
1 parent 1e7a967 commit 4bd91fb

File tree

2 files changed

+127
-99
lines changed

2 files changed

+127
-99
lines changed

libudpard/udpard.c

Lines changed: 76 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -470,13 +470,14 @@ static bool header_deserialize(const udpard_bytes_mut_t dgram_payload,
470470
// ---------------------------------------------------------------------------------------------------------------------
471471

472472
/// This may be allocated in the NIC DMA region so we keep overheads tight.
473-
/// An alternative approach would be to have a flex array of tx_frame_t* pointers in the tx_transfer_t.
474473
typedef struct tx_frame_t
475474
{
476475
struct tx_frame_t* next;
477476
byte_t data[];
478477
} tx_frame_t;
479478

479+
static size_t tx_frame_size(const size_t mtu) { return sizeof(tx_frame_t) + mtu + HEADER_SIZE_BYTES; }
480+
480481
typedef struct tx_transfer_t
481482
{
482483
/// Various indexes this transfer is a member of.
@@ -492,8 +493,8 @@ typedef struct tx_transfer_t
492493
tx_frame_t* cursor;
493494

494495
/// Retransmission state.
495-
uint16_t retries;
496-
udpard_us_t next_attempt_at;
496+
uint_fast8_t retries;
497+
udpard_us_t next_attempt_at;
497498

498499
/// All frames except for the last one share the same MTU, so there's no use keeping dedicated size per frame.
499500
size_t mtu;
@@ -511,7 +512,7 @@ typedef struct tx_transfer_t
511512

512513
static bool tx_validate_mem_resources(const udpard_tx_mem_resources_t memory)
513514
{
514-
return (memory.meta.alloc != NULL) && (memory.meta.free != NULL) && //
515+
return (memory.transfer.alloc != NULL) && (memory.transfer.free != NULL) && //
515516
(memory.payload.alloc != NULL) && (memory.payload.free != NULL);
516517
}
517518

@@ -527,63 +528,63 @@ static int32_t tx_cavl_compare_deadline(const void* const user, const udpard_tre
527528
return ((*(const udpard_us_t*)user) >= CAVL2_TO_OWNER(node, tx_frame_t, index_deadline)->deadline) ? +1 : -1;
528529
}
529530

530-
static tx_frame_t* tx_frame_new(const udpard_tx_mem_resources_t memory, const size_t payload_size)
531-
{
532-
tx_frame_t* const out = mem_alloc(memory.payload, sizeof(tx_frame_t) + payload_size);
533-
if (out != NULL) {
534-
out->next = NULL; // Last by default.
535-
}
536-
return out;
537-
}
538-
539-
typedef struct
540-
{
541-
tx_frame_t* head;
542-
tx_frame_t* tail;
543-
size_t count;
544-
} tx_chain_t;
545-
546-
/// The tail is NULL if OOM. The caller is responsible for freeing the memory allocated for the chain.
547-
static tx_chain_t tx_spool(const udpard_tx_mem_resources_t memory,
548-
const size_t mtu,
549-
const meta_t meta,
550-
const udpard_bytes_t payload)
531+
/// Returns the head of the transfer chain; NULL on OOM.
532+
static tx_frame_t* tx_spool(const udpard_tx_mem_resources_t memory,
533+
const size_t mtu,
534+
const meta_t meta,
535+
const udpard_bytes_t payload)
551536
{
552537
UDPARD_ASSERT(mtu > 0);
553538
UDPARD_ASSERT((payload.data != NULL) || (payload.size == 0U));
554-
uint32_t prefix_crc = CRC_INITIAL;
555-
tx_chain_t out = { NULL, NULL, 0 };
556-
size_t offset = 0U;
539+
uint32_t prefix_crc = CRC_INITIAL;
540+
tx_frame_t* head = NULL;
541+
tx_frame_t* tail = NULL;
542+
size_t frame_index = 0U;
543+
size_t offset = 0U;
544+
// Run the O(n) copy loop, where n is the payload size.
545+
// The client doesn't have to ensure that the payload data survives beyond this function call.
557546
do {
558-
const size_t progress = smaller(payload.size - offset, mtu);
559-
tx_frame_t* const item = tx_frame_new(memory, progress + HEADER_SIZE_BYTES);
560-
if (NULL == out.head) {
561-
out.head = item;
562-
} else {
563-
out.tail->next = item;
547+
// Compute the size of the next frame, allocate it and link it up in the chain.
548+
const size_t progress = smaller(payload.size - offset, mtu);
549+
{
550+
tx_frame_t* const item = mem_alloc(memory.payload, sizeof(tx_frame_t) + progress + HEADER_SIZE_BYTES);
551+
if (NULL == head) {
552+
head = item;
553+
} else {
554+
tail->next = item;
555+
}
556+
tail = item;
564557
}
565-
out.tail = item;
566-
if (NULL == out.tail) {
558+
// On OOM, deallocate the entire chain and quit.
559+
if (NULL == tail) {
560+
while (head != NULL) {
561+
tx_frame_t* const next = head->next;
562+
mem_free(memory.payload, tx_frame_size((head == tail) ? progress : mtu), head);
563+
head = next;
564+
}
567565
break;
568566
}
567+
// Populate the frame contents.
568+
tail->next = NULL;
569569
const byte_t* const read_ptr = ((const byte_t*)payload.data) + offset;
570570
prefix_crc = crc_add(prefix_crc, progress, read_ptr);
571571
byte_t* const write_ptr =
572-
header_serialize(item->data, meta, (uint32_t)out.count, (uint32_t)offset, prefix_crc ^ CRC_OUTPUT_XOR);
572+
header_serialize(tail->data, meta, (uint32_t)frame_index, (uint32_t)offset, prefix_crc ^ CRC_OUTPUT_XOR);
573573
(void)memcpy(write_ptr, read_ptr, progress); // NOLINT(*DeprecatedOrUnsafeBufferHandling)
574+
// Advance the state.
575+
++frame_index;
574576
offset += progress;
575577
UDPARD_ASSERT(offset <= payload.size);
576-
out.count++;
577578
} while (offset < payload.size);
578-
UDPARD_ASSERT((offset == payload.size) || (out.tail == NULL));
579-
return out;
579+
UDPARD_ASSERT((offset == payload.size) || ((head == NULL) && (tail == NULL)));
580+
return head;
580581
}
581582

582-
/// Derives the ack timeout for an outgoing transfer using an empirical formula.
583+
/// Derives the ack timeout for an outgoing transfer.
583584
/// The number of retries is initially zero when the transfer is sent for the first time.
584-
static udpard_us_t tx_ack_timeout(const udpard_us_t baseline, const udpard_prio_t prio, const uint16_t retries)
585+
static udpard_us_t tx_ack_timeout(const udpard_us_t baseline, const udpard_prio_t prio, const uint_fast8_t retries)
585586
{
586-
return baseline * (1L << smaller((uint16_t)prio + retries, 15)); // NOLINT(*-signed-bitwise)
587+
return baseline * (1L << smaller((size_t)prio + retries, UDPARD_TX_RETRY_MAX)); // NOLINT(*-signed-bitwise)
587588
}
588589

589590
static uint32_t tx_push(udpard_tx_t* const tx,
@@ -594,39 +595,41 @@ static uint32_t tx_push(udpard_tx_t* const tx,
594595
void* const user_transfer_reference)
595596
{
596597
UDPARD_ASSERT(tx != NULL);
597-
uint32_t out = 0; // The number of frames enqueued; zero on error (error counters incremented).
598-
const size_t mtu = larger(tx->mtu, UDPARD_MTU_MIN);
599-
const size_t frame_count = larger(1, (payload.size + mtu - 1U) / mtu);
600-
if ((tx->queue_size + frame_count) > tx->queue_capacity) {
598+
uint32_t out = 0; // The number of frames enqueued; zero on error (error counters incremented).
599+
const size_t payload_size = payload.size;
600+
const size_t mtu = larger(tx->mtu, UDPARD_MTU_MIN);
601+
const size_t mtu_last = ((payload_size % mtu) != 0U) ? (payload_size % mtu) : mtu;
602+
const size_t n_frames = larger(1, (payload_size + mtu - 1U) / mtu);
603+
if ((tx->queue_size + n_frames) > tx->queue_capacity) {
601604
tx->errors_capacity++;
602605
} else {
603-
const tx_chain_t chain = tx_spool(tx->memory, mtu, deadline, meta, endpoint, payload, user_transfer_reference);
604-
if (chain.tail != NULL) { // Insert the head into the tx index. Only the head, the rest is linked-listed.
605-
tx_frame_t* const head = chain.head;
606-
UDPARD_ASSERT(frame_count == chain.count);
607-
const udpard_tree_t* res = cavl2_find_or_insert(
608-
&tx->index_order, &head->priority, &tx_cavl_compare_prio, &head->index_order, &cavl2_trivial_factory);
609-
UDPARD_ASSERT(res == &head->index_order);
610-
(void)res;
611-
res = cavl2_find_or_insert(&tx->index_deadline,
612-
&head->deadline,
613-
&tx_cavl_compare_deadline,
614-
&head->index_deadline,
615-
&cavl2_trivial_factory);
616-
UDPARD_ASSERT(res == &head->index_deadline);
617-
(void)res;
618-
tx->queue_size += chain.count;
619-
UDPARD_ASSERT(tx->queue_size <= tx->queue_capacity);
620-
out = (uint32_t)chain.count;
621-
} else { // The queue is large enough but we ran out of heap memory, so we have to unwind the chain.
622-
tx->errors_oom++;
623-
tx_frame_t* head = chain.head;
624-
while (head != NULL) {
625-
tx_frame_t* const next = head->next;
626-
mem_free(tx->memory.payload, head->datagram_payload.size, head->datagram_payload.data);
627-
mem_free(tx->memory.fragment, sizeof(tx_frame_t), head);
628-
head = next;
606+
tx_transfer_t* const tr = mem_alloc(tx->memory.transfer, sizeof(tx_transfer_t));
607+
if (tr != NULL) {
608+
mem_zero(sizeof(*tr), tr);
609+
tr->retries = 0;
610+
tr->next_attempt_at = BIG_BANG; // TODO: we can implement time-triggered comms here.
611+
tr->mtu = mtu;
612+
tr->mtu_last = mtu_last;
613+
tr->topic_hash = meta.topic_hash;
614+
tr->transfer_id = meta.transfer_id;
615+
tr->deadline = deadline;
616+
tr->reliable = meta.flag_ack;
617+
tr->priority = meta.priority;
618+
tr->destination = endpoint;
619+
tr->user_transfer_reference = user_transfer_reference;
620+
tr->head = tr->cursor = tx_spool(tx->memory, mtu, meta, payload);
621+
if (tr->head != NULL) {
622+
// TODO: insert
623+
// Finalize
624+
tx->queue_size += n_frames;
625+
UDPARD_ASSERT(tx->queue_size <= tx->queue_capacity);
626+
out = (uint32_t)n_frames;
627+
} else { // The queue is large enough but we ran out of heap memory.
628+
tx->errors_oom++;
629+
mem_free(tx->memory.transfer, sizeof(tx_transfer_t), tr);
629630
}
631+
} else { // The queue is large enough but we couldn't allocate the transfer metadata object.
632+
tx->errors_oom++;
630633
}
631634
}
632635
return out;

libudpard/udpard.h

Lines changed: 51 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@
2727
/// - sizeof(rx_session_t) blocks for the RX pipeline.
2828
/// - sizeof(udpard_fragment_t) blocks for the RX pipeline.
2929
///
30+
/// Suitable allocators may be found here:
31+
/// - Constant-time ultrafast deterministic heap: https://github.com/pavel-kirienko/o1heap
32+
/// - Single-header fixed-size block pool: https://gist.github.com/pavel-kirienko/daf89e0481e6eac0f1fa8a7614667f59
33+
///
3034
/// --------------------------------------------------------------------------------------------------------------------
3135
///
3236
/// This software is distributed under the terms of the MIT License.
@@ -91,6 +95,9 @@ typedef int64_t udpard_us_t;
9195
/// See udpard_tx_t::ack_baseline_timeout.
9296
#define UDPARD_TX_ACK_BASELINE_TIMEOUT_DEFAULT_us 16000LL
9397

98+
/// The maximum number of transmission attempts for a transfer is capped at this value irrespective of other settings.
99+
#define UDPARD_TX_RETRY_MAX 31U
100+
94101
/// The subject-ID only affects the formation of the multicast UDP/IP endpoint address.
95102
/// In IPv4 networks, it is limited to 23 bits only due to the limited MAC multicast address space.
96103
/// In IPv6 networks, 32 bits are supported.
@@ -294,16 +301,29 @@ typedef struct udpard_tx_t udpard_tx_t;
294301
/// If the application knows its MTU, it can use block allocation to avoid extrinsic fragmentation.
295302
typedef struct udpard_tx_mem_resources_t
296303
{
297-
/// The queue bookkeeping structures are allocated per datagram.
298-
/// Each instance is a small fixed-size object, so a trivial zero-fragmentation block allocator is enough.
299-
udpard_mem_resource_t meta;
304+
/// The queue bookkeeping structures are allocated per outgoing transfer.
305+
/// Each instance is sizeof(tx_transfer_t), so a trivial zero-fragmentation block allocator is enough.
306+
udpard_mem_resource_t transfer;
300307

301308
/// The UDP datagram payload buffers are allocated per frame; each buffer is of size at most
302-
/// (MTU+sizeof(void*)+HEADER_SIZE) bytes, so a trivial zero-fragmentation MTU-sized+pointer block allocator
303-
/// is enough if MTU is known in advance.
309+
/// (HEADER_SIZE+MTU+sizeof(void*)) bytes, so a trivial block pool is enough if MTU is known in advance.
304310
udpard_mem_resource_t payload;
305311
} udpard_tx_mem_resources_t;
306312

313+
/// Outcome notification for a reliable transfer previously scheduled for transmission.
314+
typedef struct udpard_tx_feedback_t
315+
{
316+
uint64_t topic_hash;
317+
uint32_t transfer_id;
318+
udpard_udpip_ep_t remote_ep;
319+
320+
uint_fast8_t retries; ///< The number of attempts equals retries plus one.
321+
bool success; ///< False if no ack was received from the remote end before deadline expiration.
322+
323+
/// This is the same pointer that was passed to udpard_tx_push().
324+
void* user_transfer_reference;
325+
} udpard_tx_feedback_t;
326+
307327
/// The TX frame ejection handler returns one of these results to guide the udpard_tx_poll() logic.
308328
typedef enum udpard_tx_eject_result_t
309329
{
@@ -312,33 +332,38 @@ typedef enum udpard_tx_eject_result_t
312332
udpard_tx_eject_failed, ///< An unrecoverable error occurred while submitting the frame; drop it from the TX queue.
313333
} udpard_tx_eject_result_t;
314334

335+
typedef struct udpard_tx_ejection_t
336+
{
337+
udpard_us_t now;
338+
339+
/// Specifies when the frame should be considered expired and dropped if not yet transmitted;
340+
/// it is optional to use depending on the implementation of the NIC driver (most traditional drivers ignore it).
341+
udpard_us_t deadline;
342+
343+
uint_fast8_t dscp; ///< Set the DSCP field of the outgoing packet to this.
344+
udpard_udpip_ep_t destination; ///< Unicast or multicast UDP/IP endpoint.
345+
346+
/// If the result is udpard_tx_eject_success, the application is responsible for freeing the datagram_origin.data
347+
/// using self->memory.payload.free() at some point in the future (either within the callback or later),
348+
/// unless datagram_origin.data is NULL, in which case the library will retain the ownership.
349+
/// It may help to know that the view is a small fixed offset greater than the origin,
350+
/// so both may not have to be kept, depending on the implementation.
351+
udpard_bytes_t datagram_view; ///< Transmit this; do not free it.
352+
udpard_bytes_mut_t datagram_origin; ///< Free this unless NULL.
353+
354+
/// This is the same pointer that was passed to udpard_tx_push().
355+
void* user_transfer_reference;
356+
} udpard_tx_ejection_t;
357+
315358
typedef struct udpard_tx_vtable_t
316359
{
317360
/// Invoked from udpard_tx_poll() to push outgoing UDP datagrams into the socket/NIC driver.
318-
/// The deadline specifies when the frame should be considered expired and dropped if not yet transmitted;
319-
/// it is optional to use depending on the implementation of the NIC driver (most traditional drivers ignore it).
320-
/// If the result is udpard_tx_eject_success, the application is responsible for freeing the datagram_payload.data
321-
/// using self->memory.payload.free() at some point in the future (either within the callback or later).
322-
udpard_tx_eject_result_t (*eject)(udpard_tx_t* const self,
323-
const udpard_us_t now,
324-
const udpard_us_t deadline,
325-
const uint_fast8_t dscp,
326-
const udpard_udpip_ep_t destination,
327-
const udpard_bytes_t datagram_view, ///< Transmit this. Do not free() this.
328-
const udpard_bytes_mut_t datagram_origin, ///< free() only this.
329-
void* const user_transfer_reference);
361+
udpard_tx_eject_result_t (*eject)(udpard_tx_t*, udpard_tx_ejection_t);
330362

331363
/// Invoked from udpard_tx_poll() to report the result of reliable transfer transmission attempts.
332-
/// This is ALWAYS invoked EXACTLY ONCE per reliable transfer pushed via udpard_tx_push();
364+
/// This is ALWAYS invoked EXACTLY ONCE per reliable transfer pushed via udpard_tx_push() successfully;
333365
/// this is NOT invoked for best-effort (non-reliable) transfers.
334-
/// The user_transfer_reference is the same pointer that was passed to udpard_tx_push().
335-
/// The 'ok' flag is true if the transfer has been successfully confirmed by the remote end, false if timed out.
336-
void (*feedback)(udpard_tx_t* const self,
337-
const uint64_t topic_hash,
338-
const uint32_t transfer_id,
339-
const udpard_udpip_ep_t remote_ep,
340-
void* const user_transfer_reference,
341-
const bool ok);
366+
void (*feedback)(udpard_tx_t*, udpard_tx_feedback_t);
342367
} udpard_tx_vtable_t;
343368

344369
/// The transmission pipeline is a prioritized transmission queue that keeps UDP datagrams (aka transport frames)

0 commit comments

Comments
 (0)