001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.amqp.message;
018
019import java.io.UnsupportedEncodingException;
020import java.nio.ByteBuffer;
021import java.util.ArrayList;
022import java.util.Date;
023import java.util.Enumeration;
024import java.util.HashMap;
025
026import javax.jms.BytesMessage;
027import javax.jms.DeliveryMode;
028import javax.jms.Destination;
029import javax.jms.JMSException;
030import javax.jms.MapMessage;
031import javax.jms.Message;
032import javax.jms.MessageEOFException;
033import javax.jms.MessageFormatException;
034import javax.jms.ObjectMessage;
035import javax.jms.Queue;
036import javax.jms.StreamMessage;
037import javax.jms.TemporaryQueue;
038import javax.jms.TemporaryTopic;
039import javax.jms.TextMessage;
040import javax.jms.Topic;
041
042import org.apache.activemq.command.ActiveMQMessage;
043import org.apache.activemq.command.MessageId;
044import org.apache.qpid.proton.amqp.Binary;
045import org.apache.qpid.proton.amqp.Symbol;
046import org.apache.qpid.proton.amqp.UnsignedByte;
047import org.apache.qpid.proton.amqp.UnsignedInteger;
048import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
049import org.apache.qpid.proton.amqp.messaging.AmqpValue;
050import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
051import org.apache.qpid.proton.amqp.messaging.Data;
052import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
053import org.apache.qpid.proton.amqp.messaging.Footer;
054import org.apache.qpid.proton.amqp.messaging.Header;
055import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
056import org.apache.qpid.proton.amqp.messaging.Properties;
057import org.apache.qpid.proton.amqp.messaging.Section;
058import org.apache.qpid.proton.codec.CompositeWritableBuffer;
059import org.apache.qpid.proton.codec.DroppingWritableBuffer;
060import org.apache.qpid.proton.codec.WritableBuffer;
061import org.apache.qpid.proton.message.ProtonJMessage;
062
063public class JMSMappingOutboundTransformer extends OutboundTransformer {
064
065    public static final Symbol JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-dest");
066    public static final Symbol JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-reply-to");
067
068    public static final byte QUEUE_TYPE = 0x00;
069    public static final byte TOPIC_TYPE = 0x01;
070    public static final byte TEMP_QUEUE_TYPE = 0x02;
071    public static final byte TEMP_TOPIC_TYPE = 0x03;
072
073    // Deprecated legacy values used by old QPid AMQP 1.0 JMS client.
074
075    public static final Symbol LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-to-type");
076    public static final Symbol LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-reply-type");
077
078    public static final String LEGACY_QUEUE_TYPE = "queue";
079    public static final String LEGACY_TOPIC_TYPE = "topic";
080    public static final String LEGACY_TEMP_QUEUE_TYPE = "temporary,queue";
081    public static final String LEGACY_TEMP_TOPIC_TYPE = "temporary,topic";
082
083    public JMSMappingOutboundTransformer(JMSVendor vendor) {
084        super(vendor);
085    }
086
087    @Override
088    public EncodedMessage transform(Message msg) throws Exception {
089        if (msg == null) {
090            return null;
091        }
092
093        try {
094            if (msg.getBooleanProperty(prefixVendor + "NATIVE")) {
095                return null;
096            }
097        } catch (MessageFormatException e) {
098            return null;
099        }
100        ProtonJMessage amqp = convert(msg);
101
102        long messageFormat;
103        try {
104            messageFormat = msg.getLongProperty(this.messageFormatKey);
105        } catch (MessageFormatException e) {
106            return null;
107        }
108
109        ByteBuffer buffer = ByteBuffer.wrap(new byte[1024 * 4]);
110        final DroppingWritableBuffer overflow = new DroppingWritableBuffer();
111        int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
112        if (overflow.position() > 0) {
113            buffer = ByteBuffer.wrap(new byte[1024 * 4 + overflow.position()]);
114            c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer));
115        }
116
117        return new EncodedMessage(messageFormat, buffer.array(), 0, c);
118    }
119
120    /**
121     * Perform the conversion between JMS Message and Proton Message without
122     * re-encoding it to array. This is needed because some frameworks may elect
123     * to do this on their own way (Netty for instance using Nettybuffers)
124     *
125     * @param msg
126     * @return
127     * @throws Exception
128     */
129    public ProtonJMessage convert(Message msg) throws JMSException, UnsupportedEncodingException {
130        Header header = new Header();
131        Properties props = new Properties();
132        HashMap<Symbol, Object> daMap = null;
133        HashMap<Symbol, Object> maMap = null;
134        HashMap apMap = null;
135        Section body = null;
136        HashMap footerMap = null;
137        if (msg instanceof BytesMessage) {
138            BytesMessage m = (BytesMessage) msg;
139            byte data[] = new byte[(int) m.getBodyLength()];
140            m.readBytes(data);
141            m.reset(); // Need to reset after readBytes or future readBytes
142                       // calls (ex: redeliveries) will fail and return -1
143            body = new Data(new Binary(data));
144        }
145        if (msg instanceof TextMessage) {
146            body = new AmqpValue(((TextMessage) msg).getText());
147        }
148        if (msg instanceof MapMessage) {
149            final HashMap<String, Object> map = new HashMap<String, Object>();
150            final MapMessage m = (MapMessage) msg;
151            final Enumeration<String> names = m.getMapNames();
152            while (names.hasMoreElements()) {
153                String key = names.nextElement();
154                map.put(key, m.getObject(key));
155            }
156            body = new AmqpValue(map);
157        }
158        if (msg instanceof StreamMessage) {
159            ArrayList<Object> list = new ArrayList<Object>();
160            final StreamMessage m = (StreamMessage) msg;
161            try {
162                while (true) {
163                    list.add(m.readObject());
164                }
165            } catch (MessageEOFException e) {
166            }
167            body = new AmqpSequence(list);
168        }
169        if (msg instanceof ObjectMessage) {
170            body = new AmqpValue(((ObjectMessage) msg).getObject());
171        }
172
173        header.setDurable(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false);
174        header.setPriority(new UnsignedByte((byte) msg.getJMSPriority()));
175        if (msg.getJMSType() != null) {
176            props.setSubject(msg.getJMSType());
177        }
178        if (msg.getJMSMessageID() != null) {
179            ActiveMQMessage amqMsg = (ActiveMQMessage) msg;
180
181            MessageId msgId = amqMsg.getMessageId();
182            if (msgId.getTextView() != null) {
183                props.setMessageId(msgId.getTextView());
184            } else {
185                props.setMessageId(msgId.toString());
186            }
187        }
188        if (msg.getJMSDestination() != null) {
189            props.setTo(vendor.toAddress(msg.getJMSDestination()));
190            if (maMap == null) {
191                maMap = new HashMap<Symbol, Object>();
192            }
193            maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(msg.getJMSDestination()));
194
195            // Deprecated: used by legacy QPid AMQP 1.0 JMS client
196            maMap.put(LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION, destinationAttributes(msg.getJMSDestination()));
197        }
198        if (msg.getJMSReplyTo() != null) {
199            props.setReplyTo(vendor.toAddress(msg.getJMSReplyTo()));
200            if (maMap == null) {
201                maMap = new HashMap<Symbol, Object>();
202            }
203            maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(msg.getJMSReplyTo()));
204
205            // Deprecated: used by legacy QPid AMQP 1.0 JMS client
206            maMap.put(LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationAttributes(msg.getJMSReplyTo()));
207        }
208        if (msg.getJMSCorrelationID() != null) {
209            props.setCorrelationId(msg.getJMSCorrelationID());
210        }
211        if (msg.getJMSExpiration() != 0) {
212            long ttl = msg.getJMSExpiration() - System.currentTimeMillis();
213            if (ttl < 0) {
214                ttl = 1;
215            }
216            header.setTtl(new UnsignedInteger((int) ttl));
217
218            props.setAbsoluteExpiryTime(new Date(msg.getJMSExpiration()));
219        }
220        if (msg.getJMSTimestamp() != 0) {
221            props.setCreationTime(new Date(msg.getJMSTimestamp()));
222        }
223
224        final Enumeration<String> keys = msg.getPropertyNames();
225        while (keys.hasMoreElements()) {
226            String key = keys.nextElement();
227            if (key.equals(messageFormatKey) || key.equals(nativeKey)) {
228                // skip..
229            } else if (key.equals(firstAcquirerKey)) {
230                header.setFirstAcquirer(msg.getBooleanProperty(key));
231            } else if (key.startsWith("JMSXDeliveryCount")) {
232                // The AMQP delivery-count field only includes prior failed delivery attempts,
233                // whereas JMSXDeliveryCount includes the first/current delivery attempt.
234                int amqpDeliveryCount = msg.getIntProperty(key) - 1;
235                if (amqpDeliveryCount > 0) {
236                    header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount));
237                }
238            } else if (key.startsWith("JMSXUserID")) {
239                String value = msg.getStringProperty(key);
240                props.setUserId(new Binary(value.getBytes("UTF-8")));
241            } else if (key.startsWith("JMSXGroupID")) {
242                String value = msg.getStringProperty(key);
243                props.setGroupId(value);
244                if (apMap == null) {
245                    apMap = new HashMap();
246                }
247                apMap.put(key, value);
248            } else if (key.startsWith("JMSXGroupSeq")) {
249                UnsignedInteger value = new UnsignedInteger(msg.getIntProperty(key));
250                props.setGroupSequence(value);
251                if (apMap == null) {
252                    apMap = new HashMap();
253                }
254                apMap.put(key, value);
255            } else if (key.startsWith(prefixDeliveryAnnotationsKey)) {
256                if (daMap == null) {
257                    daMap = new HashMap<Symbol, Object>();
258                }
259                String name = key.substring(prefixDeliveryAnnotationsKey.length());
260                daMap.put(Symbol.valueOf(name), msg.getObjectProperty(key));
261            } else if (key.startsWith(prefixMessageAnnotationsKey)) {
262                if (maMap == null) {
263                    maMap = new HashMap<Symbol, Object>();
264                }
265                String name = key.substring(prefixMessageAnnotationsKey.length());
266                maMap.put(Symbol.valueOf(name), msg.getObjectProperty(key));
267            } else if (key.equals(contentTypeKey)) {
268                props.setContentType(Symbol.getSymbol(msg.getStringProperty(key)));
269            } else if (key.equals(contentEncodingKey)) {
270                props.setContentEncoding(Symbol.getSymbol(msg.getStringProperty(key)));
271            } else if (key.equals(replyToGroupIDKey)) {
272                props.setReplyToGroupId(msg.getStringProperty(key));
273            } else if (key.startsWith(prefixFooterKey)) {
274                if (footerMap == null) {
275                    footerMap = new HashMap();
276                }
277                String name = key.substring(prefixFooterKey.length());
278                footerMap.put(name, msg.getObjectProperty(key));
279            } else {
280                if (apMap == null) {
281                    apMap = new HashMap();
282                }
283                apMap.put(key, msg.getObjectProperty(key));
284            }
285        }
286
287        MessageAnnotations ma = null;
288        if (maMap != null) {
289            ma = new MessageAnnotations(maMap);
290        }
291        DeliveryAnnotations da = null;
292        if (daMap != null) {
293            da = new DeliveryAnnotations(daMap);
294        }
295        ApplicationProperties ap = null;
296        if (apMap != null) {
297            ap = new ApplicationProperties(apMap);
298        }
299        Footer footer = null;
300        if (footerMap != null) {
301            footer = new Footer(footerMap);
302        }
303
304        return (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(header, da, ma, props, ap, body, footer);
305    }
306
307    private static byte destinationType(Destination destination) {
308        if (destination instanceof Queue) {
309            if (destination instanceof TemporaryQueue) {
310                return TEMP_QUEUE_TYPE;
311            } else {
312                return QUEUE_TYPE;
313            }
314        } else if (destination instanceof Topic) {
315            if (destination instanceof TemporaryTopic) {
316                return TEMP_TOPIC_TYPE;
317            } else {
318                return TOPIC_TYPE;
319            }
320        }
321
322        throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer.");
323    }
324
325    // Used by legacy QPid AMQP 1.0 JMS client.
326    @Deprecated
327    private static String destinationAttributes(Destination destination) {
328        if (destination instanceof Queue) {
329            if (destination instanceof TemporaryQueue) {
330                return LEGACY_TEMP_QUEUE_TYPE;
331            } else {
332                return LEGACY_QUEUE_TYPE;
333            }
334        } else if (destination instanceof Topic) {
335            if (destination instanceof TemporaryTopic) {
336                return LEGACY_TEMP_TOPIC_TYPE;
337            } else {
338                return LEGACY_TOPIC_TYPE;
339            }
340        }
341
342        throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer.");
343    }
344}