001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.stomp;
018
019import org.apache.activemq.command.ActiveMQBytesMessage;
020import org.apache.activemq.command.ActiveMQDestination;
021import org.apache.activemq.command.ActiveMQMessage;
022import org.apache.activemq.command.ActiveMQTextMessage;
023import org.apache.activemq.util.ByteArrayOutputStream;
024import org.apache.activemq.util.ByteSequence;
025
026import javax.jms.Destination;
027import javax.jms.JMSException;
028import java.io.DataOutputStream;
029import java.io.IOException;
030import java.util.HashMap;
031import java.util.Map;
032
033/**
034 * Implements ActiveMQ 4.0 translations
035 */
036public class LegacyFrameTranslator implements FrameTranslator {
037
038    public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command) throws JMSException, ProtocolException {
039        final Map<?, ?> headers = command.getHeaders();
040        final ActiveMQMessage msg;
041        /*
042         * To reduce the complexity of this method perhaps a Chain of Responsibility
043         * would be a better implementation
044         */
045        if (headers.containsKey(Stomp.Headers.AMQ_MESSAGE_TYPE)) {
046            String intendedType = (String)headers.get(Stomp.Headers.AMQ_MESSAGE_TYPE);
047            if(intendedType.equalsIgnoreCase("text")){
048                ActiveMQTextMessage text = new ActiveMQTextMessage();
049                try {
050                    ByteArrayOutputStream bytes = new ByteArrayOutputStream(command.getContent().length + 4);
051                    DataOutputStream data = new DataOutputStream(bytes);
052                    data.writeInt(command.getContent().length);
053                    data.write(command.getContent());
054                    text.setContent(bytes.toByteSequence());
055                    data.close();
056                } catch (Throwable e) {
057                    throw new ProtocolException("Text could not bet set: " + e, false, e);
058                }
059                msg = text;
060            } else if(intendedType.equalsIgnoreCase("bytes")) {
061                ActiveMQBytesMessage byteMessage = new ActiveMQBytesMessage();
062                byteMessage.writeBytes(command.getContent());
063                msg = byteMessage;
064            } else {
065                throw new ProtocolException("Unsupported message type '"+intendedType+"'",false);
066            }
067        }else if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) {
068            headers.remove(Stomp.Headers.CONTENT_LENGTH);
069            ActiveMQBytesMessage bm = new ActiveMQBytesMessage();
070            bm.writeBytes(command.getContent());
071            msg = bm;
072        } else {
073            ActiveMQTextMessage text = new ActiveMQTextMessage();
074            try {
075                ByteArrayOutputStream bytes = new ByteArrayOutputStream(command.getContent().length + 4);
076                DataOutputStream data = new DataOutputStream(bytes);
077                data.writeInt(command.getContent().length);
078                data.write(command.getContent());
079                text.setContent(bytes.toByteSequence());
080                data.close();
081            } catch (Throwable e) {
082                throw new ProtocolException("Text could not bet set: " + e, false, e);
083            }
084            msg = text;
085        }
086        FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
087        return msg;
088    }
089
090    public StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException {
091        StompFrame command = new StompFrame();
092        command.setAction(Stomp.Responses.MESSAGE);
093        Map<String, String> headers = new HashMap<String, String>(25);
094        command.setHeaders(headers);
095
096        FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(converter, message, command, this);
097
098        if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
099
100            if (!message.isCompressed() && message.getContent() != null) {
101                ByteSequence msgContent = message.getContent();
102                if (msgContent.getLength() > 4) {
103                    byte[] content = new byte[msgContent.getLength() - 4];
104                    System.arraycopy(msgContent.data, 4, content, 0, content.length);
105                    command.setContent(content);
106                }
107            } else {
108                ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy();
109                String messageText = msg.getText();
110                if (messageText != null) {
111                    command.setContent(msg.getText().getBytes("UTF-8"));
112                }
113            }
114
115        } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
116
117            ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message.copy();
118            msg.setReadOnlyBody(true);
119            byte[] data = new byte[(int)msg.getBodyLength()];
120            msg.readBytes(data);
121
122            headers.put(Stomp.Headers.CONTENT_LENGTH, Integer.toString(data.length));
123            command.setContent(data);
124        }
125
126        return command;
127    }
128
129    public String convertDestination(ProtocolConverter converter, Destination d) {
130        if (d == null) {
131            return null;
132        }
133        ActiveMQDestination activeMQDestination = (ActiveMQDestination)d;
134        String physicalName = activeMQDestination.getPhysicalName();
135
136        String rc = converter.getCreatedTempDestinationName(activeMQDestination);
137        if( rc!=null ) {
138            return rc;
139        }
140
141        StringBuilder buffer = new StringBuilder();
142        if (activeMQDestination.isQueue()) {
143            if (activeMQDestination.isTemporary()) {
144                buffer.append("/remote-temp-queue/");
145            } else {
146                buffer.append("/queue/");
147            }
148        } else {
149            if (activeMQDestination.isTemporary()) {
150                buffer.append("/remote-temp-topic/");
151            } else {
152                buffer.append("/topic/");
153            }
154        }
155        buffer.append(physicalName);
156        return buffer.toString();
157    }
158
159    public ActiveMQDestination convertDestination(ProtocolConverter converter, String name, boolean forceFallback) throws ProtocolException {
160        if (name == null) {
161            return null;
162        }
163
164        // in case of space padding by a client we trim for the initial detection, on fallback use
165        // the un-trimmed value.
166        String originalName = name;
167        name = name.trim();
168
169        if (name.startsWith("/queue/")) {
170            String qName = name.substring("/queue/".length(), name.length());
171            return ActiveMQDestination.createDestination(qName, ActiveMQDestination.QUEUE_TYPE);
172        } else if (name.startsWith("/topic/")) {
173            String tName = name.substring("/topic/".length(), name.length());
174            return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TOPIC_TYPE);
175        } else if (name.startsWith("/remote-temp-queue/")) {
176            String tName = name.substring("/remote-temp-queue/".length(), name.length());
177            return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_QUEUE_TYPE);
178        } else if (name.startsWith("/remote-temp-topic/")) {
179            String tName = name.substring("/remote-temp-topic/".length(), name.length());
180            return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_TOPIC_TYPE);
181        } else if (name.startsWith("/temp-queue/")) {
182            return converter.createTempDestination(name, false);
183        } else if (name.startsWith("/temp-topic/")) {
184            return converter.createTempDestination(name, true);
185        } else {
186            if (forceFallback) {
187                try {
188                    ActiveMQDestination fallback = ActiveMQDestination.getUnresolvableDestinationTransformer().transform(originalName);
189                    if (fallback != null) {
190                        return fallback;
191                    }
192                } catch (JMSException e) {
193                    throw new ProtocolException("Illegal destination name: [" + originalName + "] -- ActiveMQ STOMP destinations "
194                            + "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/", false, e);
195                }
196            }
197            throw new ProtocolException("Illegal destination name: [" + originalName + "] -- ActiveMQ STOMP destinations "
198                                        + "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
199        }
200    }
201
202
203}