1919import java .util .Collections ;
2020import java .util .Iterator ;
2121import java .util .List ;
22+ import java .util .Map ;
2223import java .util .Set ;
2324import java .util .UUID ;
2425
@@ -94,14 +95,14 @@ public class SQSMessageConsumerPrefetch implements Runnable, PrefetchManager {
9495 * Counter on how many messages are prefetched into internal messageQueue.
9596 */
9697 protected int messagesPrefetched = 0 ;
97-
98+
9899 /**
99100 * Counter on how many messages have been explicitly requested.
100101 * TODO: Consider renaming this class and several other variables now that
101102 * this logic factors in message requests as well as prefetching.
102103 */
103104 protected int messagesRequested = 0 ;
104-
105+
105106 /**
106107 * States of the prefetch thread
107108 */
@@ -121,7 +122,7 @@ public class SQSMessageConsumerPrefetch implements Runnable, PrefetchManager {
121122 * 25ms delayInterval.
122123 */
123124 protected ExponentialBackoffStrategy backoffStrategy = new ExponentialBackoffStrategy (25 ,25 ,2000 );
124-
125+
125126 SQSMessageConsumerPrefetch (SQSSessionCallbackScheduler sqsSessionRunnable , Acknowledger acknowledger ,
126127 NegativeAcknowledger negativeAcknowledger , SQSQueueDestination sqsDestination ,
127128 AmazonSQSMessagingClientWrapper amazonSQSClient , int numberOfMessagesToPrefetch ) {
@@ -139,16 +140,16 @@ public class SQSMessageConsumerPrefetch implements Runnable, PrefetchManager {
139140 MessageListener getMessageListener () {
140141 return messageListener ;
141142 }
142-
143+
143144 void setMessageConsumer (SQSMessageConsumer messageConsumer ) {
144145 this .messageConsumer = messageConsumer ;
145146 }
146-
147+
147148 @ Override
148149 public SQSMessageConsumer getMessageConsumer () {
149150 return messageConsumer ;
150151 }
151-
152+
152153 /**
153154 * Sets the message listener.
154155 * <P>
@@ -168,7 +169,7 @@ protected void setMessageListener(MessageListener messageListener) {
168169 if (!running || isClosed ()) {
169170 return ;
170171 }
171-
172+
172173 List <MessageManager > allPrefetchedMessages = new ArrayList <MessageManager >(messageQueue );
173174 sqsSessionRunnable .scheduleCallBacks (messageListener , allPrefetchedMessages );
174175 messageQueue .clear ();
@@ -179,7 +180,7 @@ protected void setMessageListener(MessageListener messageListener) {
179180 messageListenerReady ();
180181 }
181182 }
182-
183+
183184 /**
184185 * Determine the number of messages we should attempt to fetch from SQS.
185186 * Returns the difference between the number of messages needed (either for
@@ -189,7 +190,7 @@ private int numberOfMessagesToFetch() {
189190 int numberOfMessagesNeeded = Math .max (numberOfMessagesToPrefetch , messagesRequested );
190191 return Math .max (numberOfMessagesNeeded - messagesPrefetched , 0 );
191192 }
192-
193+
193194 /**
194195 * Runs until the message consumer is closed and in-progress SQS
195196 * <code>receiveMessage</code> call returns.
@@ -210,7 +211,7 @@ public void run() {
210211 if (isClosed ()) {
211212 break ;
212213 }
213-
214+
214215 synchronized (stateLock ) {
215216 waitForStart ();
216217 waitForPrefetch ();
@@ -238,7 +239,7 @@ public void run() {
238239 }
239240 }
240241 }
241-
242+
242243 /**
243244 * Call <code>receiveMessage</code> with the given wait time.
244245 */
@@ -259,7 +260,7 @@ protected List<Message> getMessages(int batchSize, int waitTimeSeconds) throws J
259260 ReceiveMessageResult receivedMessageResult = amazonSQSClient .receiveMessage (receiveMessageRequest );
260261 return receivedMessageResult .getMessages ();
261262 }
262-
263+
263264 /**
264265 * Converts the received message to JMS message, and pushes to messages to
265266 * either callback scheduler for asynchronous message delivery or to
@@ -278,14 +279,14 @@ protected void processReceivedMessages(List<Message> messages) {
278279 nackMessages .add (message .getReceiptHandle ());
279280 }
280281 }
281-
282+
282283 synchronized (stateLock ) {
283284 if (messageListener != null ) {
284285 sqsSessionRunnable .scheduleCallBacks (messageListener , messageManagers );
285286 } else {
286287 messageQueue .addAll (messageManagers );
287288 }
288-
289+
289290 messagesPrefetched += messageManagers .size ();
290291 notifyStateChange ();
291292 }
@@ -359,9 +360,9 @@ protected javax.jms.Message convertToJMSMessage(Message message) throws JMSExcep
359360 throw new JMSException ("Not a supported JMS message type" );
360361 }
361362 }
362-
363+
363364 jmsMessage .setJMSDestination (sqsDestination );
364-
365+
365366 MessageAttributeValue replyToQueueNameAttribute = message .getMessageAttributes ().get (
366367 SQSMessage .JMS_SQS_REPLY_TO_QUEUE_NAME );
367368 MessageAttributeValue replyToQueueUrlAttribute = message .getMessageAttributes ().get (
@@ -372,16 +373,27 @@ protected javax.jms.Message convertToJMSMessage(Message message) throws JMSExcep
372373 Destination replyToQueue = new SQSQueueDestination (replyToQueueName , replyToQueueUrl );
373374 jmsMessage .setJMSReplyTo (replyToQueue );
374375 }
375-
376+
376377 MessageAttributeValue correlationIdAttribute = message .getMessageAttributes ().get (
377378 SQSMessage .JMS_SQS_CORRELATION_ID );
378379 if (correlationIdAttribute != null ) {
379380 jmsMessage .setJMSCorrelationID (correlationIdAttribute .getStringValue ());
380381 }
381-
382+
383+ jmsMessage .setJMSTimestamp (getJMSTimestamp (message ));
382384 return jmsMessage ;
383385 }
384386
387+ private long getJMSTimestamp (Message message ) {
388+ Map <String , String > systemAttributes = message .getAttributes ();
389+ String timestamp = systemAttributes .get (SQSMessagingClientConstants .SENT_TIMESTAMP );
390+ if (timestamp != null ) {
391+ return Long .parseLong (timestamp );
392+ } else {
393+ return 0L ;
394+ }
395+ }
396+
385397 protected void nackQueueMessages () {
386398 // Also nack messages already in the messageQueue
387399 synchronized (stateLock ) {
@@ -407,7 +419,7 @@ protected void waitForStart() throws InterruptedException {
407419 }
408420 }
409421 }
410-
422+
411423 @ Override
412424 public void messageDispatched () {
413425 synchronized (stateLock ) {
@@ -429,7 +441,7 @@ public void messageListenerReady() {
429441 }
430442 }
431443 }
432-
444+
433445 void requestMessage () {
434446 synchronized (stateLock ) {
435447 messagesRequested ++;
@@ -443,7 +455,7 @@ private void unrequestMessage() {
443455 notifyStateChange ();
444456 }
445457 }
446-
458+
447459 public static class MessageManager {
448460
449461 private final PrefetchManager prefetchManager ;
@@ -467,7 +479,7 @@ public javax.jms.Message getMessage() {
467479 javax .jms .Message receive () throws JMSException {
468480 return receive (0 );
469481 }
470-
482+
471483 javax .jms .Message receive (long timeout ) throws JMSException {
472484 if (cannotDeliver ()) {
473485 return null ;
@@ -476,7 +488,7 @@ javax.jms.Message receive(long timeout) throws JMSException {
476488 if (timeout < 0 ) {
477489 timeout = 0 ;
478490 }
479-
491+
480492 MessageManager messageManager = null ;
481493 synchronized (stateLock ) {
482494 requestMessage ();
@@ -486,7 +498,7 @@ javax.jms.Message receive(long timeout) throws JMSException {
486498 messageManager = messageQueue .pollFirst ();
487499 } else {
488500 long startTime = System .currentTimeMillis ();
489-
501+
490502 long waitTime = 0 ;
491503 while (messageQueue .isEmpty () && !isClosed () &&
492504 (timeout == 0 || (waitTime = getWaitTime (timeout , startTime )) > 0 )) {
@@ -525,7 +537,7 @@ javax.jms.Message receiveNoWait() throws JMSException {
525537 if (cannotDeliver ()) {
526538 return null ;
527539 }
528-
540+
529541 MessageManager messageManager ;
530542 synchronized (stateLock ) {
531543 if (messageQueue .isEmpty () && numberOfMessagesToPrefetch == 0 ) {
@@ -572,22 +584,22 @@ void close() {
572584 messageListener = null ;
573585 }
574586 }
575-
587+
576588 /**
577589 * Helper that notifies PrefetchThread that message is dispatched and AutoAcknowledge
578590 */
579591 private javax .jms .Message messageHandler (MessageManager messageManager ) throws JMSException {
580592 if (messageManager == null ) {
581593 return null ;
582- }
594+ }
583595 javax .jms .Message message = messageManager .getMessage ();
584-
596+
585597 // Notify PrefetchThread that message is dispatched
586598 this .messageDispatched ();
587599 acknowledger .notifyMessageReceived ((SQSMessage ) message );
588600 return message ;
589601 }
590-
602+
591603 private boolean cannotDeliver () throws JMSException {
592604 if (!running ) {
593605 return true ;
@@ -602,7 +614,7 @@ private boolean cannotDeliver() throws JMSException {
602614 }
603615 return false ;
604616 }
605-
617+
606618 /**
607619 * Sleeps for the configured time.
608620 */
@@ -613,7 +625,7 @@ protected void sleep(long sleepTimeMillis) throws InterruptedException {
613625 throw e ;
614626 }
615627 }
616-
628+
617629 protected boolean isClosed () {
618630 return closed ;
619631 }
@@ -641,7 +653,7 @@ List<SQSMessageIdentifier> purgePrefetchedMessagesWithGroups(Set<String> affecte
641653
642654 notifyStateChange ();
643655 }
644-
656+
645657 return purgedMessages ;
646658 }
647659}
0 commit comments