001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.amqp.message; 018 019import java.io.UnsupportedEncodingException; 020import java.nio.ByteBuffer; 021import java.util.ArrayList; 022import java.util.Date; 023import java.util.Enumeration; 024import java.util.HashMap; 025 026import javax.jms.BytesMessage; 027import javax.jms.DeliveryMode; 028import javax.jms.Destination; 029import javax.jms.JMSException; 030import javax.jms.MapMessage; 031import javax.jms.Message; 032import javax.jms.MessageEOFException; 033import javax.jms.MessageFormatException; 034import javax.jms.ObjectMessage; 035import javax.jms.Queue; 036import javax.jms.StreamMessage; 037import javax.jms.TemporaryQueue; 038import javax.jms.TemporaryTopic; 039import javax.jms.TextMessage; 040import javax.jms.Topic; 041 042import org.apache.activemq.command.ActiveMQMessage; 043import org.apache.activemq.command.MessageId; 044import org.apache.qpid.proton.amqp.Binary; 045import org.apache.qpid.proton.amqp.Symbol; 046import org.apache.qpid.proton.amqp.UnsignedByte; 047import org.apache.qpid.proton.amqp.UnsignedInteger; 048import org.apache.qpid.proton.amqp.messaging.AmqpSequence; 049import org.apache.qpid.proton.amqp.messaging.AmqpValue; 050import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; 051import org.apache.qpid.proton.amqp.messaging.Data; 052import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; 053import org.apache.qpid.proton.amqp.messaging.Footer; 054import org.apache.qpid.proton.amqp.messaging.Header; 055import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; 056import org.apache.qpid.proton.amqp.messaging.Properties; 057import org.apache.qpid.proton.amqp.messaging.Section; 058import org.apache.qpid.proton.codec.CompositeWritableBuffer; 059import org.apache.qpid.proton.codec.DroppingWritableBuffer; 060import org.apache.qpid.proton.codec.WritableBuffer; 061import org.apache.qpid.proton.message.ProtonJMessage; 062 063public class JMSMappingOutboundTransformer extends OutboundTransformer { 064 065 public static final Symbol JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-dest"); 066 public static final Symbol JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-reply-to"); 067 068 public static final byte QUEUE_TYPE = 0x00; 069 public static final byte TOPIC_TYPE = 0x01; 070 public static final byte TEMP_QUEUE_TYPE = 0x02; 071 public static final byte TEMP_TOPIC_TYPE = 0x03; 072 073 // Deprecated legacy values used by old QPid AMQP 1.0 JMS client. 074 075 public static final Symbol LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-to-type"); 076 public static final Symbol LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-reply-type"); 077 078 public static final String LEGACY_QUEUE_TYPE = "queue"; 079 public static final String LEGACY_TOPIC_TYPE = "topic"; 080 public static final String LEGACY_TEMP_QUEUE_TYPE = "temporary,queue"; 081 public static final String LEGACY_TEMP_TOPIC_TYPE = "temporary,topic"; 082 083 public JMSMappingOutboundTransformer(JMSVendor vendor) { 084 super(vendor); 085 } 086 087 @Override 088 public EncodedMessage transform(Message msg) throws Exception { 089 if (msg == null) { 090 return null; 091 } 092 093 try { 094 if (msg.getBooleanProperty(prefixVendor + "NATIVE")) { 095 return null; 096 } 097 } catch (MessageFormatException e) { 098 return null; 099 } 100 ProtonJMessage amqp = convert(msg); 101 102 long messageFormat; 103 try { 104 messageFormat = msg.getLongProperty(this.messageFormatKey); 105 } catch (MessageFormatException e) { 106 return null; 107 } 108 109 ByteBuffer buffer = ByteBuffer.wrap(new byte[1024 * 4]); 110 final DroppingWritableBuffer overflow = new DroppingWritableBuffer(); 111 int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow)); 112 if (overflow.position() > 0) { 113 buffer = ByteBuffer.wrap(new byte[1024 * 4 + overflow.position()]); 114 c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer)); 115 } 116 117 return new EncodedMessage(messageFormat, buffer.array(), 0, c); 118 } 119 120 /** 121 * Perform the conversion between JMS Message and Proton Message without 122 * re-encoding it to array. This is needed because some frameworks may elect 123 * to do this on their own way (Netty for instance using Nettybuffers) 124 * 125 * @param msg 126 * @return 127 * @throws Exception 128 */ 129 public ProtonJMessage convert(Message msg) throws JMSException, UnsupportedEncodingException { 130 Header header = new Header(); 131 Properties props = new Properties(); 132 HashMap<Symbol, Object> daMap = null; 133 HashMap<Symbol, Object> maMap = null; 134 HashMap apMap = null; 135 Section body = null; 136 HashMap footerMap = null; 137 if (msg instanceof BytesMessage) { 138 BytesMessage m = (BytesMessage) msg; 139 byte data[] = new byte[(int) m.getBodyLength()]; 140 m.readBytes(data); 141 m.reset(); // Need to reset after readBytes or future readBytes 142 // calls (ex: redeliveries) will fail and return -1 143 body = new Data(new Binary(data)); 144 } 145 if (msg instanceof TextMessage) { 146 body = new AmqpValue(((TextMessage) msg).getText()); 147 } 148 if (msg instanceof MapMessage) { 149 final HashMap<String, Object> map = new HashMap<String, Object>(); 150 final MapMessage m = (MapMessage) msg; 151 final Enumeration<String> names = m.getMapNames(); 152 while (names.hasMoreElements()) { 153 String key = names.nextElement(); 154 map.put(key, m.getObject(key)); 155 } 156 body = new AmqpValue(map); 157 } 158 if (msg instanceof StreamMessage) { 159 ArrayList<Object> list = new ArrayList<Object>(); 160 final StreamMessage m = (StreamMessage) msg; 161 try { 162 while (true) { 163 list.add(m.readObject()); 164 } 165 } catch (MessageEOFException e) { 166 } 167 body = new AmqpSequence(list); 168 } 169 if (msg instanceof ObjectMessage) { 170 body = new AmqpValue(((ObjectMessage) msg).getObject()); 171 } 172 173 header.setDurable(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false); 174 header.setPriority(new UnsignedByte((byte) msg.getJMSPriority())); 175 if (msg.getJMSType() != null) { 176 props.setSubject(msg.getJMSType()); 177 } 178 if (msg.getJMSMessageID() != null) { 179 ActiveMQMessage amqMsg = (ActiveMQMessage) msg; 180 181 MessageId msgId = amqMsg.getMessageId(); 182 if (msgId.getTextView() != null) { 183 props.setMessageId(msgId.getTextView()); 184 } else { 185 props.setMessageId(msgId.toString()); 186 } 187 } 188 if (msg.getJMSDestination() != null) { 189 props.setTo(vendor.toAddress(msg.getJMSDestination())); 190 if (maMap == null) { 191 maMap = new HashMap<Symbol, Object>(); 192 } 193 maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(msg.getJMSDestination())); 194 195 // Deprecated: used by legacy QPid AMQP 1.0 JMS client 196 maMap.put(LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION, destinationAttributes(msg.getJMSDestination())); 197 } 198 if (msg.getJMSReplyTo() != null) { 199 props.setReplyTo(vendor.toAddress(msg.getJMSReplyTo())); 200 if (maMap == null) { 201 maMap = new HashMap<Symbol, Object>(); 202 } 203 maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(msg.getJMSReplyTo())); 204 205 // Deprecated: used by legacy QPid AMQP 1.0 JMS client 206 maMap.put(LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationAttributes(msg.getJMSReplyTo())); 207 } 208 if (msg.getJMSCorrelationID() != null) { 209 props.setCorrelationId(msg.getJMSCorrelationID()); 210 } 211 if (msg.getJMSExpiration() != 0) { 212 long ttl = msg.getJMSExpiration() - System.currentTimeMillis(); 213 if (ttl < 0) { 214 ttl = 1; 215 } 216 header.setTtl(new UnsignedInteger((int) ttl)); 217 218 props.setAbsoluteExpiryTime(new Date(msg.getJMSExpiration())); 219 } 220 if (msg.getJMSTimestamp() != 0) { 221 props.setCreationTime(new Date(msg.getJMSTimestamp())); 222 } 223 224 final Enumeration<String> keys = msg.getPropertyNames(); 225 while (keys.hasMoreElements()) { 226 String key = keys.nextElement(); 227 if (key.equals(messageFormatKey) || key.equals(nativeKey)) { 228 // skip.. 229 } else if (key.equals(firstAcquirerKey)) { 230 header.setFirstAcquirer(msg.getBooleanProperty(key)); 231 } else if (key.startsWith("JMSXDeliveryCount")) { 232 // The AMQP delivery-count field only includes prior failed delivery attempts, 233 // whereas JMSXDeliveryCount includes the first/current delivery attempt. 234 int amqpDeliveryCount = msg.getIntProperty(key) - 1; 235 if (amqpDeliveryCount > 0) { 236 header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount)); 237 } 238 } else if (key.startsWith("JMSXUserID")) { 239 String value = msg.getStringProperty(key); 240 props.setUserId(new Binary(value.getBytes("UTF-8"))); 241 } else if (key.startsWith("JMSXGroupID")) { 242 String value = msg.getStringProperty(key); 243 props.setGroupId(value); 244 if (apMap == null) { 245 apMap = new HashMap(); 246 } 247 apMap.put(key, value); 248 } else if (key.startsWith("JMSXGroupSeq")) { 249 UnsignedInteger value = new UnsignedInteger(msg.getIntProperty(key)); 250 props.setGroupSequence(value); 251 if (apMap == null) { 252 apMap = new HashMap(); 253 } 254 apMap.put(key, value); 255 } else if (key.startsWith(prefixDeliveryAnnotationsKey)) { 256 if (daMap == null) { 257 daMap = new HashMap<Symbol, Object>(); 258 } 259 String name = key.substring(prefixDeliveryAnnotationsKey.length()); 260 daMap.put(Symbol.valueOf(name), msg.getObjectProperty(key)); 261 } else if (key.startsWith(prefixMessageAnnotationsKey)) { 262 if (maMap == null) { 263 maMap = new HashMap<Symbol, Object>(); 264 } 265 String name = key.substring(prefixMessageAnnotationsKey.length()); 266 maMap.put(Symbol.valueOf(name), msg.getObjectProperty(key)); 267 } else if (key.equals(contentTypeKey)) { 268 props.setContentType(Symbol.getSymbol(msg.getStringProperty(key))); 269 } else if (key.equals(contentEncodingKey)) { 270 props.setContentEncoding(Symbol.getSymbol(msg.getStringProperty(key))); 271 } else if (key.equals(replyToGroupIDKey)) { 272 props.setReplyToGroupId(msg.getStringProperty(key)); 273 } else if (key.startsWith(prefixFooterKey)) { 274 if (footerMap == null) { 275 footerMap = new HashMap(); 276 } 277 String name = key.substring(prefixFooterKey.length()); 278 footerMap.put(name, msg.getObjectProperty(key)); 279 } else { 280 if (apMap == null) { 281 apMap = new HashMap(); 282 } 283 apMap.put(key, msg.getObjectProperty(key)); 284 } 285 } 286 287 MessageAnnotations ma = null; 288 if (maMap != null) { 289 ma = new MessageAnnotations(maMap); 290 } 291 DeliveryAnnotations da = null; 292 if (daMap != null) { 293 da = new DeliveryAnnotations(daMap); 294 } 295 ApplicationProperties ap = null; 296 if (apMap != null) { 297 ap = new ApplicationProperties(apMap); 298 } 299 Footer footer = null; 300 if (footerMap != null) { 301 footer = new Footer(footerMap); 302 } 303 304 return (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(header, da, ma, props, ap, body, footer); 305 } 306 307 private static byte destinationType(Destination destination) { 308 if (destination instanceof Queue) { 309 if (destination instanceof TemporaryQueue) { 310 return TEMP_QUEUE_TYPE; 311 } else { 312 return QUEUE_TYPE; 313 } 314 } else if (destination instanceof Topic) { 315 if (destination instanceof TemporaryTopic) { 316 return TEMP_TOPIC_TYPE; 317 } else { 318 return TOPIC_TYPE; 319 } 320 } 321 322 throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer."); 323 } 324 325 // Used by legacy QPid AMQP 1.0 JMS client. 326 @Deprecated 327 private static String destinationAttributes(Destination destination) { 328 if (destination instanceof Queue) { 329 if (destination instanceof TemporaryQueue) { 330 return LEGACY_TEMP_QUEUE_TYPE; 331 } else { 332 return LEGACY_QUEUE_TYPE; 333 } 334 } else if (destination instanceof Topic) { 335 if (destination instanceof TemporaryTopic) { 336 return LEGACY_TEMP_TOPIC_TYPE; 337 } else { 338 return LEGACY_TOPIC_TYPE; 339 } 340 } 341 342 throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer."); 343 } 344}