Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,30 @@ private static boolean isKnownKafkaConnectSuffix(String name, int start, int len
|| (len == 3 && name.regionMatches(true, start, "map", 0, 3));
}

/**
* Strips URI prefixes ({@code queue://} and {@code topic://}) that IBM MQ's JMS implementation
* can return from {@code Queue.getQueueName()} when destinations are created with URI syntax.
*/
static String sanitizeUriPrefix(String name) {
if (name == null || name.length() < 9) {
return name;
}
int offset = -1;
if (name.length() >= 8 && name.regionMatches(true, 0, "queue://", 0, 8)) {
offset = 8;
} else if (name.length() >= 8 && name.regionMatches(true, 0, "topic://", 0, 8)) {
offset = 8;
}
if (offset < 0) {
return name;
}
// Strip any remaining leading '/' characters (handles queue:///QUEUE -> /QUEUE -> QUEUE)
while (offset < name.length() && name.charAt(offset) == '/') {
offset++;
}
return offset < name.length() ? name.substring(offset) : name;
}

public CharSequence toResourceName(String destinationName, boolean isQueue) {
if (null == destinationName) {
return isQueue ? queueTempResourceName : topicTempResourceName;
Expand Down Expand Up @@ -284,6 +308,10 @@ public String getDestinationName(Destination destination) {
} catch (Exception e) {
log.debug("Unable to get jms destination name", e);
}
if (null != name) {
// Strip URI prefixes (queue:// and topic://) from IBM MQ destination names
name = sanitizeUriPrefix(name);
}
if (null != name && !name.startsWith(TIBCO_TMP_PREFIX)) {
// Sanitize Kafka Connect schema-derived suffixes from queue/topic names
return sanitizeDestinationName(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,59 @@ class JMSDecoratorTest extends Specification {
result == null
}

def "test getDestinationName strips URI prefixes from IBM MQ destination names"() {
given:
def decorator = JMSDecorator.CONSUMER_DECORATE

when:
def queue = Mock(Queue) {
getQueueName() >> rawQueueName
}
def result = decorator.getDestinationName(queue)

then:
result == expectedName

where:
rawQueueName | expectedName
// Triple slash (no queue manager) - most common IBM MQ URI form
"queue:///MY.QUEUE" | "MY.QUEUE"
// Double slash
"queue://MY.QUEUE" | "MY.QUEUE"
// Topic URI prefixes
"topic:///MY.TOPIC" | "MY.TOPIC"
"topic://MY.TOPIC" | "MY.TOPIC"
// Case insensitive
"QUEUE:///MY.QUEUE" | "MY.QUEUE"
"Queue:///MY.QUEUE" | "MY.QUEUE"
"TOPIC:///MY.TOPIC" | "MY.TOPIC"
// Names without prefix pass through unchanged
"MY.QUEUE" | "MY.QUEUE"
"simple" | "simple"
// Combined: URI prefix + Kafka Connect suffix both stripped
"queue:///myqueue_messagebody_0" | "myqueue"
}

def "test getDestinationName strips URI prefixes from topic destinations"() {
given:
def decorator = JMSDecorator.CONSUMER_DECORATE

when:
def topic = Mock(Topic) {
getTopicName() >> rawTopicName
}
def result = decorator.getDestinationName(topic)

then:
result == expectedName

where:
rawTopicName | expectedName
"topic:///MY.TOPIC" | "MY.TOPIC"
"topic://MY.TOPIC" | "MY.TOPIC"
"queue:///MY.TOPIC" | "MY.TOPIC"
}

def "test getDestinationName returns null for TIBCO temp prefix"() {
given:
def decorator = JMSDecorator.CONSUMER_DECORATE
Expand Down
Loading