Skip to content

Commit 599a538

Browse files
authored
pubsub: ensure atleast 1 message is pulled (#197)
1 parent 30af604 commit 599a538

File tree

3 files changed

+108
-5
lines changed

3 files changed

+108
-5
lines changed

pubsub/pubsub-client/src/main/java/com/salesforce/multicloudj/pubsub/driver/AbstractSubscription.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ private QueueConfig() {
133133
protected final AtomicBoolean isShutdown = new AtomicBoolean(false);
134134

135135
/** Current best estimate for how many messages to fetch in order to maintain the desired queue duration. */
136-
private int runningBatchSize = 1;
136+
private double runningBatchSize = 1;
137137

138138
/** Timestamp (in nanoseconds) when the current throughput measurement window started. */
139139
private long throughputStart = 0L;
@@ -520,18 +520,19 @@ private int updateBatchSize() {
520520
double minSize = runningBatchSize * QueueConfig.MAX_SHRINK_FACTOR;
521521

522522
if (newBatchSize > maxSize) {
523-
runningBatchSize = (int) maxSize;
523+
runningBatchSize = maxSize;
524524
} else if (newBatchSize < minSize) {
525-
runningBatchSize = (int) minSize;
525+
runningBatchSize = minSize;
526526
} else {
527-
runningBatchSize = (int) newBatchSize;
527+
runningBatchSize = newBatchSize;
528528
}
529529
}
530530

531531
throughputStart = now;
532532
throughputCount = 0;
533533

534-
return Math.min(runningBatchSize, QueueConfig.MAX_BATCH_SIZE);
534+
// Rounding up to the nearest integer to avoid pulling 0 messages
535+
return (int) Math.ceil(Math.min(runningBatchSize, QueueConfig.MAX_BATCH_SIZE));
535536
}
536537

537538
/**

pubsub/pubsub-client/src/test/java/com/salesforce/multicloudj/pubsub/client/AbstractPubsubIT.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.junit.jupiter.api.AfterAll;
1212
import org.junit.jupiter.api.AfterEach;
1313
import org.junit.jupiter.api.Assertions;
14+
import org.junit.jupiter.api.Assumptions;
1415
import org.junit.jupiter.api.BeforeAll;
1516
import org.junit.jupiter.api.BeforeEach;
1617
import org.junit.jupiter.api.Disabled;
@@ -26,6 +27,7 @@
2627
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
2728
public abstract class AbstractPubsubIT {
2829

30+
protected static final String AWS_PROVIDER_ID = "aws";
2931

3032
public interface Harness extends AutoCloseable {
3133

@@ -347,4 +349,60 @@ public void testGetAttributes() throws Exception {
347349
Assertions.assertNotNull(attributes.getTopic(), "Should have topic attribute");
348350
}
349351
}
352+
353+
@Test
354+
@Timeout(30)
355+
public void testMultipleSendReceiveWithoutBatch() throws Exception {
356+
Assumptions.assumeFalse(AWS_PROVIDER_ID.equals(harness.getProviderId()));
357+
try (AbstractTopic topic = harness.createTopicDriver();
358+
AbstractSubscription subscription = harness.createSubscriptionDriver()) {
359+
360+
int numMessages = 5;
361+
List<Message> sentMessages = new ArrayList<>();
362+
363+
// Send messages one by one (not in batch)
364+
for (int i = 0; i < numMessages; i++) {
365+
Message message = Message.builder()
366+
.withBody(("non-batch-msg-" + i).getBytes())
367+
.withMetadata(Map.of("index", String.valueOf(i)))
368+
.build();
369+
topic.send(message);
370+
sentMessages.add(message);
371+
TimeUnit.MILLISECONDS.sleep(100); // Small delay between sends
372+
}
373+
374+
TimeUnit.MILLISECONDS.sleep(500); // Allow time for messages to be available
375+
376+
// Receive and ack messages one by one (not in batch)
377+
List<Message> receivedMessages = new ArrayList<>();
378+
379+
while (receivedMessages.size() < numMessages) {
380+
try {
381+
Message received = subscription.receive();
382+
if (received != null && received.getAckID() != null) {
383+
receivedMessages.add(received);
384+
// Ack immediately after receiving (not in batch)
385+
subscription.sendAck(received.getAckID());
386+
System.out.println("Received and acked message " + receivedMessages.size() + "/" + numMessages);
387+
} else {
388+
TimeUnit.MILLISECONDS.sleep(100);
389+
}
390+
} catch (Exception e) {
391+
System.err.println("Error receiving message: " + e.getMessage());
392+
TimeUnit.MILLISECONDS.sleep(100);
393+
}
394+
}
395+
396+
Assertions.assertEquals(numMessages, receivedMessages.size(),
397+
"Should receive all messages. Expected: " + numMessages + ", Got: " + receivedMessages.size());
398+
399+
// Verify all messages were received
400+
for (int i = 0; i < receivedMessages.size(); i++) {
401+
Message received = receivedMessages.get(i);
402+
Assertions.assertNotNull(received, "Received message " + i + " should not be null");
403+
Assertions.assertNotNull(received.getBody(), "Received message " + i + " body should not be null");
404+
Assertions.assertNotNull(received.getAckID(), "Received message " + i + " should have AckID");
405+
}
406+
}
407+
}
350408
}

pubsub/pubsub-client/src/test/java/com/salesforce/multicloudj/pubsub/driver/AbstractSubscriptionTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.concurrent.CompletableFuture;
1919
import java.util.concurrent.CountDownLatch;
2020
import java.util.concurrent.ExecutionException;
21+
import java.util.concurrent.atomic.AtomicInteger;
2122

2223
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
2324
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -713,4 +714,47 @@ void testBuilderProviderId() {
713714

714715
assertEquals("my-provider", sub.getProviderId());
715716
}
717+
718+
@Test
719+
void testBatchSizeNeverZero() throws Exception {
720+
// Test that updateBatchSize() never returns 0
721+
MockMessageSource source = new MockMessageSource();
722+
List<Integer> batchSizes = Collections.synchronizedList(new ArrayList<>());
723+
final AtomicInteger callCount = new AtomicInteger(0);
724+
725+
TestSubscription sub = new TestSubscription(source) {
726+
@Override
727+
protected List<Message> doReceiveBatch(int batchSize) {
728+
batchSizes.add(batchSize);
729+
int call = callCount.incrementAndGet();
730+
731+
// First call: fetch the message so receive() doesn't hang
732+
// Subsequent calls: return empty to trigger batch size shrinking
733+
if (call == 1) {
734+
return source.fetchBatch(batchSize);
735+
}
736+
return Collections.emptyList();
737+
}
738+
};
739+
740+
// Add one message to trigger prefetch
741+
source.addMessages(List.of(
742+
Message.builder().withBody("test".getBytes()).build()
743+
));
744+
745+
// Receive the message - this triggers prefetch which calls updateBatchSize()
746+
Message msg = sub.receive();
747+
assertNotNull(msg);
748+
749+
// Wait for prefetch operations to complete
750+
Thread.sleep(500);
751+
752+
// All batch sizes should be >= 1
753+
assertFalse(batchSizes.isEmpty(), "doReceiveBatch should have been called at least once");
754+
for (int size : batchSizes) {
755+
assertTrue(size >= 1, "Batch size from updateBatchSize() should never be less than 1, but got: " + size);
756+
}
757+
758+
sub.close();
759+
}
716760
}

0 commit comments

Comments
 (0)