001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.amqp.message; 018 019import java.util.Map; 020import java.util.Set; 021 022import javax.jms.DeliveryMode; 023import javax.jms.JMSException; 024import javax.jms.Message; 025 026import org.apache.qpid.proton.amqp.Binary; 027import org.apache.qpid.proton.amqp.Decimal128; 028import org.apache.qpid.proton.amqp.Decimal32; 029import org.apache.qpid.proton.amqp.Decimal64; 030import org.apache.qpid.proton.amqp.Symbol; 031import org.apache.qpid.proton.amqp.UnsignedByte; 032import org.apache.qpid.proton.amqp.UnsignedInteger; 033import org.apache.qpid.proton.amqp.UnsignedLong; 034import org.apache.qpid.proton.amqp.UnsignedShort; 035import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; 036import org.apache.qpid.proton.amqp.messaging.Footer; 037import org.apache.qpid.proton.amqp.messaging.Header; 038import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; 039import org.apache.qpid.proton.amqp.messaging.Properties; 040 041public abstract class InboundTransformer { 042 043 JMSVendor vendor; 044 045 public static final String TRANSFORMER_NATIVE = "native"; 046 public static final String TRANSFORMER_RAW = "raw"; 047 public static final String TRANSFORMER_JMS = "jms"; 048 049 String prefixVendor = "JMS_AMQP_"; 050 String prefixDeliveryAnnotations = "DA_"; 051 String prefixMessageAnnotations = "MA_"; 052 String prefixFooter = "FT_"; 053 054 int defaultDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE; 055 int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY; 056 long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE; 057 058 public InboundTransformer(JMSVendor vendor) { 059 this.vendor = vendor; 060 } 061 062 public abstract Message transform(EncodedMessage amqpMessage) throws Exception; 063 064 public abstract String getTransformerName(); 065 066 public abstract InboundTransformer getFallbackTransformer(); 067 068 public int getDefaultDeliveryMode() { 069 return defaultDeliveryMode; 070 } 071 072 public void setDefaultDeliveryMode(int defaultDeliveryMode) { 073 this.defaultDeliveryMode = defaultDeliveryMode; 074 } 075 076 public int getDefaultPriority() { 077 return defaultPriority; 078 } 079 080 public void setDefaultPriority(int defaultPriority) { 081 this.defaultPriority = defaultPriority; 082 } 083 084 public long getDefaultTtl() { 085 return defaultTtl; 086 } 087 088 public void setDefaultTtl(long defaultTtl) { 089 this.defaultTtl = defaultTtl; 090 } 091 092 public String getPrefixVendor() { 093 return prefixVendor; 094 } 095 096 public void setPrefixVendor(String prefixVendor) { 097 this.prefixVendor = prefixVendor; 098 } 099 100 public JMSVendor getVendor() { 101 return vendor; 102 } 103 104 public void setVendor(JMSVendor vendor) { 105 this.vendor = vendor; 106 } 107 108 @SuppressWarnings("unchecked") 109 protected void populateMessage(Message jms, org.apache.qpid.proton.message.Message amqp) throws Exception { 110 Header header = amqp.getHeader(); 111 if (header == null) { 112 header = new Header(); 113 } 114 115 if (header.getDurable() != null) { 116 jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); 117 } else { 118 jms.setJMSDeliveryMode(defaultDeliveryMode); 119 } 120 if (header.getPriority() != null) { 121 jms.setJMSPriority(header.getPriority().intValue()); 122 } else { 123 jms.setJMSPriority(defaultPriority); 124 } 125 if (header.getFirstAcquirer() != null) { 126 jms.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer()); 127 } 128 if (header.getDeliveryCount() != null) { 129 vendor.setJMSXDeliveryCount(jms, header.getDeliveryCount().longValue()); 130 } 131 132 final MessageAnnotations ma = amqp.getMessageAnnotations(); 133 if (ma != null) { 134 for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) { 135 String key = entry.getKey().toString(); 136 if ("x-opt-jms-type".equals(key) && entry.getValue() != null) { 137 // Legacy annotation, JMSType value will be replaced by Subject further down if also present. 138 jms.setJMSType(entry.getValue().toString()); 139 } 140 141 setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue()); 142 } 143 } 144 145 final ApplicationProperties ap = amqp.getApplicationProperties(); 146 if (ap != null) { 147 for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) ap.getValue().entrySet()) { 148 String key = entry.getKey().toString(); 149 if ("JMSXGroupID".equals(key)) { 150 vendor.setJMSXGroupID(jms, entry.getValue().toString()); 151 } else if ("JMSXGroupSequence".equals(key)) { 152 vendor.setJMSXGroupSequence(jms, ((Number) entry.getValue()).intValue()); 153 } else if ("JMSXUserID".equals(key)) { 154 vendor.setJMSXUserID(jms, entry.getValue().toString()); 155 } else { 156 setProperty(jms, key, entry.getValue()); 157 } 158 } 159 } 160 161 final Properties properties = amqp.getProperties(); 162 if (properties != null) { 163 if (properties.getMessageId() != null) { 164 jms.setJMSMessageID(properties.getMessageId().toString()); 165 } 166 Binary userId = properties.getUserId(); 167 if (userId != null) { 168 vendor.setJMSXUserID(jms, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), "UTF-8")); 169 } 170 if (properties.getTo() != null) { 171 jms.setJMSDestination(vendor.createDestination(properties.getTo())); 172 } 173 if (properties.getSubject() != null) { 174 jms.setJMSType(properties.getSubject()); 175 } 176 if (properties.getReplyTo() != null) { 177 jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo())); 178 } 179 if (properties.getCorrelationId() != null) { 180 jms.setJMSCorrelationID(properties.getCorrelationId().toString()); 181 } 182 if (properties.getContentType() != null) { 183 jms.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString()); 184 } 185 if (properties.getContentEncoding() != null) { 186 jms.setStringProperty(prefixVendor + "ContentEncoding", properties.getContentEncoding().toString()); 187 } 188 if (properties.getCreationTime() != null) { 189 jms.setJMSTimestamp(properties.getCreationTime().getTime()); 190 } 191 if (properties.getGroupId() != null) { 192 vendor.setJMSXGroupID(jms, properties.getGroupId()); 193 } 194 if (properties.getGroupSequence() != null) { 195 vendor.setJMSXGroupSequence(jms, properties.getGroupSequence().intValue()); 196 } 197 if (properties.getReplyToGroupId() != null) { 198 jms.setStringProperty(prefixVendor + "ReplyToGroupID", properties.getReplyToGroupId()); 199 } 200 if (properties.getAbsoluteExpiryTime() != null) { 201 jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime()); 202 } 203 } 204 205 // If the jms expiration has not yet been set... 206 if (jms.getJMSExpiration() == 0) { 207 // Then lets try to set it based on the message ttl. 208 long ttl = defaultTtl; 209 if (header.getTtl() != null) { 210 ttl = header.getTtl().longValue(); 211 } 212 if (ttl == 0) { 213 jms.setJMSExpiration(0); 214 } else { 215 jms.setJMSExpiration(System.currentTimeMillis() + ttl); 216 } 217 } 218 219 final Footer fp = amqp.getFooter(); 220 if (fp != null) { 221 for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) fp.getValue().entrySet()) { 222 String key = entry.getKey().toString(); 223 setProperty(jms, prefixVendor + prefixFooter + key, entry.getValue()); 224 } 225 } 226 } 227 228 private void setProperty(Message msg, String key, Object value) throws JMSException { 229 if (value instanceof UnsignedLong) { 230 long v = ((UnsignedLong) value).longValue(); 231 msg.setLongProperty(key, v); 232 } else if (value instanceof UnsignedInteger) { 233 long v = ((UnsignedInteger) value).longValue(); 234 if (Integer.MIN_VALUE <= v && v <= Integer.MAX_VALUE) { 235 msg.setIntProperty(key, (int) v); 236 } else { 237 msg.setLongProperty(key, v); 238 } 239 } else if (value instanceof UnsignedShort) { 240 int v = ((UnsignedShort) value).intValue(); 241 if (Short.MIN_VALUE <= v && v <= Short.MAX_VALUE) { 242 msg.setShortProperty(key, (short) v); 243 } else { 244 msg.setIntProperty(key, v); 245 } 246 } else if (value instanceof UnsignedByte) { 247 short v = ((UnsignedByte) value).shortValue(); 248 if (Byte.MIN_VALUE <= v && v <= Byte.MAX_VALUE) { 249 msg.setByteProperty(key, (byte) v); 250 } else { 251 msg.setShortProperty(key, v); 252 } 253 } else if (value instanceof Symbol) { 254 msg.setStringProperty(key, value.toString()); 255 } else if (value instanceof Decimal128) { 256 msg.setDoubleProperty(key, ((Decimal128) value).doubleValue()); 257 } else if (value instanceof Decimal64) { 258 msg.setDoubleProperty(key, ((Decimal64) value).doubleValue()); 259 } else if (value instanceof Decimal32) { 260 msg.setFloatProperty(key, ((Decimal32) value).floatValue()); 261 } else if (value instanceof Binary) { 262 msg.setStringProperty(key, value.toString()); 263 } else { 264 msg.setObjectProperty(key, value); 265 } 266 } 267}