001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.activemq.command; 019 020import java.io.DataInputStream; 021import java.io.DataOutputStream; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.ObjectOutputStream; 025import java.io.OutputStream; 026import java.io.Serializable; 027import java.util.zip.DeflaterOutputStream; 028import java.util.zip.InflaterInputStream; 029 030import javax.jms.JMSException; 031import javax.jms.ObjectMessage; 032 033import org.apache.activemq.ActiveMQConnection; 034import org.apache.activemq.util.ByteArrayInputStream; 035import org.apache.activemq.util.ByteArrayOutputStream; 036import org.apache.activemq.util.ByteSequence; 037import org.apache.activemq.util.ClassLoadingAwareObjectInputStream; 038import org.apache.activemq.util.JMSExceptionSupport; 039import org.apache.activemq.wireformat.WireFormat; 040 041/** 042 * An <CODE>ObjectMessage</CODE> object is used to send a message that 043 * contains a serializable object in the Java programming language ("Java 044 * object"). It inherits from the <CODE>Message</CODE> interface and adds a 045 * body containing a single reference to an object. Only 046 * <CODE>Serializable</CODE> Java objects can be used. <p/> 047 * <P> 048 * If a collection of Java objects must be sent, one of the 049 * <CODE>Collection</CODE> classes provided since JDK 1.2 can be used. <p/> 050 * <P> 051 * When a client receives an <CODE>ObjectMessage</CODE>, it is in read-only 052 * mode. If a client attempts to write to the message at this point, a 053 * <CODE>MessageNotWriteableException</CODE> is thrown. If 054 * <CODE>clearBody</CODE> is called, the message can now be both read from and 055 * written to. 056 * 057 * @openwire:marshaller code="26" 058 * @see javax.jms.Session#createObjectMessage() 059 * @see javax.jms.Session#createObjectMessage(Serializable) 060 * @see javax.jms.BytesMessage 061 * @see javax.jms.MapMessage 062 * @see javax.jms.Message 063 * @see javax.jms.StreamMessage 064 * @see javax.jms.TextMessage 065 */ 066public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMessage { 067 068 // TODO: verify classloader 069 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_OBJECT_MESSAGE; 070 static final ClassLoader ACTIVEMQ_CLASSLOADER = ActiveMQObjectMessage.class.getClassLoader(); 071 072 protected transient Serializable object; 073 074 public Message copy() { 075 ActiveMQObjectMessage copy = new ActiveMQObjectMessage(); 076 copy(copy); 077 return copy; 078 } 079 080 private void copy(ActiveMQObjectMessage copy) { 081 ActiveMQConnection connection = getConnection(); 082 if (connection == null || !connection.isObjectMessageSerializationDefered()) { 083 storeContent(); 084 copy.object = null; 085 } else { 086 copy.object = object; 087 } 088 super.copy(copy); 089 090 } 091 092 @Override 093 public void storeContentAndClear() { 094 storeContent(); 095 object = null; 096 } 097 098 @Override 099 public void storeContent() { 100 ByteSequence bodyAsBytes = getContent(); 101 if (bodyAsBytes == null && object != null) { 102 try { 103 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); 104 OutputStream os = bytesOut; 105 ActiveMQConnection connection = getConnection(); 106 if (connection != null && connection.isUseCompression()) { 107 compressed = true; 108 os = new DeflaterOutputStream(os); 109 } 110 DataOutputStream dataOut = new DataOutputStream(os); 111 ObjectOutputStream objOut = new ObjectOutputStream(dataOut); 112 objOut.writeObject(object); 113 objOut.flush(); 114 objOut.reset(); 115 objOut.close(); 116 setContent(bytesOut.toByteSequence()); 117 } catch (IOException ioe) { 118 throw new RuntimeException(ioe.getMessage(), ioe); 119 } 120 } 121 } 122 123 public byte getDataStructureType() { 124 return DATA_STRUCTURE_TYPE; 125 } 126 127 public String getJMSXMimeType() { 128 return "jms/object-message"; 129 } 130 131 /** 132 * Clears out the message body. Clearing a message's body does not clear its 133 * header values or property entries. <p/> 134 * <P> 135 * If this message body was read-only, calling this method leaves the 136 * message body in the same state as an empty body in a newly created 137 * message. 138 * 139 * @throws JMSException if the JMS provider fails to clear the message body 140 * due to some internal error. 141 */ 142 143 public void clearBody() throws JMSException { 144 super.clearBody(); 145 this.object = null; 146 } 147 148 /** 149 * Sets the serializable object containing this message's data. It is 150 * important to note that an <CODE>ObjectMessage</CODE> contains a 151 * snapshot of the object at the time <CODE>setObject()</CODE> is called; 152 * subsequent modifications of the object will have no effect on the 153 * <CODE>ObjectMessage</CODE> body. 154 * 155 * @param newObject the message's data 156 * @throws JMSException if the JMS provider fails to set the object due to 157 * some internal error. 158 * @throws javax.jms.MessageFormatException if object serialization fails. 159 * @throws javax.jms.MessageNotWriteableException if the message is in 160 * read-only mode. 161 */ 162 163 public void setObject(Serializable newObject) throws JMSException { 164 checkReadOnlyBody(); 165 this.object = newObject; 166 setContent(null); 167 ActiveMQConnection connection = getConnection(); 168 if (connection == null || !connection.isObjectMessageSerializationDefered()) { 169 storeContent(); 170 } 171 } 172 173 /** 174 * Gets the serializable object containing this message's data. The default 175 * value is null. 176 * 177 * @return the serializable object containing this message's data 178 * @throws JMSException 179 */ 180 public Serializable getObject() throws JMSException { 181 if (object == null && getContent() != null) { 182 try { 183 ByteSequence content = getContent(); 184 InputStream is = new ByteArrayInputStream(content); 185 if (isCompressed()) { 186 is = new InflaterInputStream(is); 187 } 188 DataInputStream dataIn = new DataInputStream(is); 189 ClassLoadingAwareObjectInputStream objIn = new ClassLoadingAwareObjectInputStream(dataIn); 190 try { 191 object = (Serializable)objIn.readObject(); 192 } catch (ClassNotFoundException ce) { 193 throw JMSExceptionSupport.create("Failed to build body from content. Serializable class not available to broker. Reason: " + ce, ce); 194 } finally { 195 dataIn.close(); 196 } 197 } catch (IOException e) { 198 throw JMSExceptionSupport.create("Failed to build body from bytes. Reason: " + e, e); 199 } 200 } 201 return this.object; 202 } 203 204 @Override 205 public void beforeMarshall(WireFormat wireFormat) throws IOException { 206 super.beforeMarshall(wireFormat); 207 // may have initiated on vm transport with deferred marshalling 208 storeContent(); 209 } 210 211 public void clearMarshalledState() throws JMSException { 212 super.clearMarshalledState(); 213 this.object = null; 214 } 215 216 public void onMessageRolledBack() { 217 super.onMessageRolledBack(); 218 219 // lets force the object to be deserialized again - as we could have 220 // changed the object 221 object = null; 222 } 223 224 @Override 225 public void compress() throws IOException { 226 storeContent(); 227 super.compress(); 228 } 229 230 public String toString() { 231 try { 232 getObject(); 233 } catch (JMSException e) { 234 } 235 return super.toString(); 236 } 237}