001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.command; 018 019import java.io.DataInputStream; 020import java.io.DataOutputStream; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.OutputStream; 024import java.util.HashMap; 025import java.util.zip.DeflaterOutputStream; 026import java.util.zip.InflaterInputStream; 027 028import javax.jms.JMSException; 029import javax.jms.MessageNotWriteableException; 030import javax.jms.TextMessage; 031 032import org.apache.activemq.ActiveMQConnection; 033import org.apache.activemq.util.ByteArrayInputStream; 034import org.apache.activemq.util.ByteArrayOutputStream; 035import org.apache.activemq.util.ByteSequence; 036import org.apache.activemq.util.JMSExceptionSupport; 037import org.apache.activemq.util.MarshallingSupport; 038import org.apache.activemq.wireformat.WireFormat; 039 040/** 041 * @openwire:marshaller code="28" 042 * 043 */ 044public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage { 045 046 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_TEXT_MESSAGE; 047 048 protected String text; 049 050 public Message copy() { 051 ActiveMQTextMessage copy = new ActiveMQTextMessage(); 052 copy(copy); 053 return copy; 054 } 055 056 private void copy(ActiveMQTextMessage copy) { 057 super.copy(copy); 058 copy.text = text; 059 } 060 061 public byte getDataStructureType() { 062 return DATA_STRUCTURE_TYPE; 063 } 064 065 public String getJMSXMimeType() { 066 return "jms/text-message"; 067 } 068 069 public void setText(String text) throws MessageNotWriteableException { 070 checkReadOnlyBody(); 071 this.text = text; 072 setContent(null); 073 } 074 075 public String getText() throws JMSException { 076 if (text == null && getContent() != null) { 077 text = decodeContent(); 078 setContent(null); 079 setCompressed(false); 080 } 081 return text; 082 } 083 084 private String decodeContent() throws JMSException { 085 String text = null; 086 if (getContent() != null) { 087 InputStream is = null; 088 try { 089 ByteSequence bodyAsBytes = getContent(); 090 if (bodyAsBytes != null) { 091 is = new ByteArrayInputStream(bodyAsBytes); 092 if (isCompressed()) { 093 is = new InflaterInputStream(is); 094 } 095 DataInputStream dataIn = new DataInputStream(is); 096 text = MarshallingSupport.readUTF8(dataIn); 097 dataIn.close(); 098 } 099 } catch (IOException ioe) { 100 throw JMSExceptionSupport.create(ioe); 101 } finally { 102 if (is != null) { 103 try { 104 is.close(); 105 } catch (IOException e) { 106 // ignore 107 } 108 } 109 } 110 } 111 return text; 112 } 113 114 public void beforeMarshall(WireFormat wireFormat) throws IOException { 115 super.beforeMarshall(wireFormat); 116 storeContent(); 117 } 118 119 @Override 120 public void storeContentAndClear() { 121 storeContent(); 122 text=null; 123 } 124 125 @Override 126 public void storeContent() { 127 try { 128 ByteSequence content = getContent(); 129 if (content == null && text != null) { 130 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); 131 OutputStream os = bytesOut; 132 ActiveMQConnection connection = getConnection(); 133 if (connection != null && connection.isUseCompression()) { 134 compressed = true; 135 os = new DeflaterOutputStream(os); 136 } 137 DataOutputStream dataOut = new DataOutputStream(os); 138 MarshallingSupport.writeUTF8(dataOut, this.text); 139 dataOut.close(); 140 setContent(bytesOut.toByteSequence()); 141 } 142 } catch (IOException e) { 143 throw new RuntimeException(e); 144 } 145 } 146 147 // see https://issues.apache.org/activemq/browse/AMQ-2103 148 // and https://issues.apache.org/activemq/browse/AMQ-2966 149 public void clearMarshalledState() throws JMSException { 150 super.clearMarshalledState(); 151 this.text = null; 152 } 153 154 /** 155 * Clears out the message body. Clearing a message's body does not clear its 156 * header values or property entries. <p/> 157 * <P> 158 * If this message body was read-only, calling this method leaves the 159 * message body in the same state as an empty body in a newly created 160 * message. 161 * 162 * @throws JMSException if the JMS provider fails to clear the message body 163 * due to some internal error. 164 */ 165 public void clearBody() throws JMSException { 166 super.clearBody(); 167 this.text = null; 168 } 169 170 public int getSize() { 171 if (size == 0 && content == null && text != null) { 172 size = getMinimumMessageSize(); 173 if (marshalledProperties != null) { 174 size += marshalledProperties.getLength(); 175 } 176 size += text.length() * 2; 177 } 178 return super.getSize(); 179 } 180 181 public String toString() { 182 try { 183 String text = this.text; 184 if( text == null ) { 185 text = decodeContent(); 186 } 187 if (text != null) { 188 text = MarshallingSupport.truncate64(text); 189 HashMap<String, Object> overrideFields = new HashMap<String, Object>(); 190 overrideFields.put("text", text); 191 return super.toString(overrideFields); 192 } 193 } catch (JMSException e) { 194 } 195 return super.toString(); 196 } 197}