001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.amqp.message;
018
019import java.util.Map;
020import java.util.Set;
021
022import javax.jms.DeliveryMode;
023import javax.jms.JMSException;
024import javax.jms.Message;
025
026import org.apache.qpid.proton.amqp.Binary;
027import org.apache.qpid.proton.amqp.Decimal128;
028import org.apache.qpid.proton.amqp.Decimal32;
029import org.apache.qpid.proton.amqp.Decimal64;
030import org.apache.qpid.proton.amqp.Symbol;
031import org.apache.qpid.proton.amqp.UnsignedByte;
032import org.apache.qpid.proton.amqp.UnsignedInteger;
033import org.apache.qpid.proton.amqp.UnsignedLong;
034import org.apache.qpid.proton.amqp.UnsignedShort;
035import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
036import org.apache.qpid.proton.amqp.messaging.Footer;
037import org.apache.qpid.proton.amqp.messaging.Header;
038import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
039import org.apache.qpid.proton.amqp.messaging.Properties;
040
041public abstract class InboundTransformer {
042
043    JMSVendor vendor;
044
045    public static final String TRANSFORMER_NATIVE = "native";
046    public static final String TRANSFORMER_RAW = "raw";
047    public static final String TRANSFORMER_JMS = "jms";
048
049    String prefixVendor = "JMS_AMQP_";
050    String prefixDeliveryAnnotations = "DA_";
051    String prefixMessageAnnotations = "MA_";
052    String prefixFooter = "FT_";
053
054    int defaultDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE;
055    int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY;
056    long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
057
058    public InboundTransformer(JMSVendor vendor) {
059        this.vendor = vendor;
060    }
061
062    public abstract Message transform(EncodedMessage amqpMessage) throws Exception;
063
064    public abstract String getTransformerName();
065
066    public abstract InboundTransformer getFallbackTransformer();
067
068    public int getDefaultDeliveryMode() {
069        return defaultDeliveryMode;
070    }
071
072    public void setDefaultDeliveryMode(int defaultDeliveryMode) {
073        this.defaultDeliveryMode = defaultDeliveryMode;
074    }
075
076    public int getDefaultPriority() {
077        return defaultPriority;
078    }
079
080    public void setDefaultPriority(int defaultPriority) {
081        this.defaultPriority = defaultPriority;
082    }
083
084    public long getDefaultTtl() {
085        return defaultTtl;
086    }
087
088    public void setDefaultTtl(long defaultTtl) {
089        this.defaultTtl = defaultTtl;
090    }
091
092    public String getPrefixVendor() {
093        return prefixVendor;
094    }
095
096    public void setPrefixVendor(String prefixVendor) {
097        this.prefixVendor = prefixVendor;
098    }
099
100    public JMSVendor getVendor() {
101        return vendor;
102    }
103
104    public void setVendor(JMSVendor vendor) {
105        this.vendor = vendor;
106    }
107
108    @SuppressWarnings("unchecked")
109    protected void populateMessage(Message jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
110        Header header = amqp.getHeader();
111        if (header == null) {
112            header = new Header();
113        }
114
115        if (header.getDurable() != null) {
116            jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
117        } else {
118            jms.setJMSDeliveryMode(defaultDeliveryMode);
119        }
120        if (header.getPriority() != null) {
121            jms.setJMSPriority(header.getPriority().intValue());
122        } else {
123            jms.setJMSPriority(defaultPriority);
124        }
125        if (header.getFirstAcquirer() != null) {
126            jms.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer());
127        }
128        if (header.getDeliveryCount() != null) {
129            vendor.setJMSXDeliveryCount(jms, header.getDeliveryCount().longValue());
130        }
131
132        final MessageAnnotations ma = amqp.getMessageAnnotations();
133        if (ma != null) {
134            for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) {
135                String key = entry.getKey().toString();
136                if ("x-opt-jms-type".equals(key) && entry.getValue() != null) {
137                    // Legacy annotation, JMSType value will be replaced by Subject further down if also present.
138                    jms.setJMSType(entry.getValue().toString());
139                }
140
141                setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue());
142            }
143        }
144
145        final ApplicationProperties ap = amqp.getApplicationProperties();
146        if (ap != null) {
147            for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) ap.getValue().entrySet()) {
148                String key = entry.getKey().toString();
149                if ("JMSXGroupID".equals(key)) {
150                    vendor.setJMSXGroupID(jms, entry.getValue().toString());
151                } else if ("JMSXGroupSequence".equals(key)) {
152                    vendor.setJMSXGroupSequence(jms, ((Number) entry.getValue()).intValue());
153                } else if ("JMSXUserID".equals(key)) {
154                    vendor.setJMSXUserID(jms, entry.getValue().toString());
155                } else {
156                    setProperty(jms, key, entry.getValue());
157                }
158            }
159        }
160
161        final Properties properties = amqp.getProperties();
162        if (properties != null) {
163            if (properties.getMessageId() != null) {
164                jms.setJMSMessageID(properties.getMessageId().toString());
165            }
166            Binary userId = properties.getUserId();
167            if (userId != null) {
168                vendor.setJMSXUserID(jms, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), "UTF-8"));
169            }
170            if (properties.getTo() != null) {
171                jms.setJMSDestination(vendor.createDestination(properties.getTo()));
172            }
173            if (properties.getSubject() != null) {
174                jms.setJMSType(properties.getSubject());
175            }
176            if (properties.getReplyTo() != null) {
177                jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo()));
178            }
179            if (properties.getCorrelationId() != null) {
180                jms.setJMSCorrelationID(properties.getCorrelationId().toString());
181            }
182            if (properties.getContentType() != null) {
183                jms.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString());
184            }
185            if (properties.getContentEncoding() != null) {
186                jms.setStringProperty(prefixVendor + "ContentEncoding", properties.getContentEncoding().toString());
187            }
188            if (properties.getCreationTime() != null) {
189                jms.setJMSTimestamp(properties.getCreationTime().getTime());
190            }
191            if (properties.getGroupId() != null) {
192                vendor.setJMSXGroupID(jms, properties.getGroupId());
193            }
194            if (properties.getGroupSequence() != null) {
195                vendor.setJMSXGroupSequence(jms, properties.getGroupSequence().intValue());
196            }
197            if (properties.getReplyToGroupId() != null) {
198                jms.setStringProperty(prefixVendor + "ReplyToGroupID", properties.getReplyToGroupId());
199            }
200            if (properties.getAbsoluteExpiryTime() != null) {
201                jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime());
202            }
203        }
204
205        // If the jms expiration has not yet been set...
206        if (jms.getJMSExpiration() == 0) {
207            // Then lets try to set it based on the message ttl.
208            long ttl = defaultTtl;
209            if (header.getTtl() != null) {
210                ttl = header.getTtl().longValue();
211            }
212            if (ttl == 0) {
213                jms.setJMSExpiration(0);
214            } else {
215                jms.setJMSExpiration(System.currentTimeMillis() + ttl);
216            }
217        }
218
219        final Footer fp = amqp.getFooter();
220        if (fp != null) {
221            for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) fp.getValue().entrySet()) {
222                String key = entry.getKey().toString();
223                setProperty(jms, prefixVendor + prefixFooter + key, entry.getValue());
224            }
225        }
226    }
227
228    private void setProperty(Message msg, String key, Object value) throws JMSException {
229        if (value instanceof UnsignedLong) {
230            long v = ((UnsignedLong) value).longValue();
231            msg.setLongProperty(key, v);
232        } else if (value instanceof UnsignedInteger) {
233            long v = ((UnsignedInteger) value).longValue();
234            if (Integer.MIN_VALUE <= v && v <= Integer.MAX_VALUE) {
235                msg.setIntProperty(key, (int) v);
236            } else {
237                msg.setLongProperty(key, v);
238            }
239        } else if (value instanceof UnsignedShort) {
240            int v = ((UnsignedShort) value).intValue();
241            if (Short.MIN_VALUE <= v && v <= Short.MAX_VALUE) {
242                msg.setShortProperty(key, (short) v);
243            } else {
244                msg.setIntProperty(key, v);
245            }
246        } else if (value instanceof UnsignedByte) {
247            short v = ((UnsignedByte) value).shortValue();
248            if (Byte.MIN_VALUE <= v && v <= Byte.MAX_VALUE) {
249                msg.setByteProperty(key, (byte) v);
250            } else {
251                msg.setShortProperty(key, v);
252            }
253        } else if (value instanceof Symbol) {
254            msg.setStringProperty(key, value.toString());
255        } else if (value instanceof Decimal128) {
256            msg.setDoubleProperty(key, ((Decimal128) value).doubleValue());
257        } else if (value instanceof Decimal64) {
258            msg.setDoubleProperty(key, ((Decimal64) value).doubleValue());
259        } else if (value instanceof Decimal32) {
260            msg.setFloatProperty(key, ((Decimal32) value).floatValue());
261        } else if (value instanceof Binary) {
262            msg.setStringProperty(key, value.toString());
263        } else {
264            msg.setObjectProperty(key, value);
265        }
266    }
267}