001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.command;
019
020import java.io.DataInputStream;
021import java.io.DataOutputStream;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.ObjectOutputStream;
025import java.io.OutputStream;
026import java.io.Serializable;
027import java.util.zip.DeflaterOutputStream;
028import java.util.zip.InflaterInputStream;
029
030import javax.jms.JMSException;
031import javax.jms.ObjectMessage;
032
033import org.apache.activemq.ActiveMQConnection;
034import org.apache.activemq.util.ByteArrayInputStream;
035import org.apache.activemq.util.ByteArrayOutputStream;
036import org.apache.activemq.util.ByteSequence;
037import org.apache.activemq.util.ClassLoadingAwareObjectInputStream;
038import org.apache.activemq.util.JMSExceptionSupport;
039import org.apache.activemq.wireformat.WireFormat;
040
041/**
042 * An <CODE>ObjectMessage</CODE> object is used to send a message that
043 * contains a serializable object in the Java programming language ("Java
044 * object"). It inherits from the <CODE>Message</CODE> interface and adds a
045 * body containing a single reference to an object. Only
046 * <CODE>Serializable</CODE> Java objects can be used. <p/>
047 * <P>
048 * If a collection of Java objects must be sent, one of the
049 * <CODE>Collection</CODE> classes provided since JDK 1.2 can be used. <p/>
050 * <P>
051 * When a client receives an <CODE>ObjectMessage</CODE>, it is in read-only
052 * mode. If a client attempts to write to the message at this point, a
053 * <CODE>MessageNotWriteableException</CODE> is thrown. If
054 * <CODE>clearBody</CODE> is called, the message can now be both read from and
055 * written to.
056 *
057 * @openwire:marshaller code="26"
058 * @see javax.jms.Session#createObjectMessage()
059 * @see javax.jms.Session#createObjectMessage(Serializable)
060 * @see javax.jms.BytesMessage
061 * @see javax.jms.MapMessage
062 * @see javax.jms.Message
063 * @see javax.jms.StreamMessage
064 * @see javax.jms.TextMessage
065 */
066public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMessage {
067
068    // TODO: verify classloader
069    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_OBJECT_MESSAGE;
070    static final ClassLoader ACTIVEMQ_CLASSLOADER = ActiveMQObjectMessage.class.getClassLoader();
071
072    protected transient Serializable object;
073
074    public Message copy() {
075        ActiveMQObjectMessage copy = new ActiveMQObjectMessage();
076        copy(copy);
077        return copy;
078    }
079
080    private void copy(ActiveMQObjectMessage copy) {
081        ActiveMQConnection connection = getConnection();
082        if (connection == null || !connection.isObjectMessageSerializationDefered()) {
083            storeContent();
084            copy.object = null;
085        } else {
086            copy.object = object;
087        }
088        super.copy(copy);
089
090    }
091
092    @Override
093    public void storeContentAndClear() {
094        storeContent();
095        object = null;
096    }
097
098    @Override
099    public void storeContent() {
100        ByteSequence bodyAsBytes = getContent();
101        if (bodyAsBytes == null && object != null) {
102            try {
103                ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
104                OutputStream os = bytesOut;
105                ActiveMQConnection connection = getConnection();
106                if (connection != null && connection.isUseCompression()) {
107                    compressed = true;
108                    os = new DeflaterOutputStream(os);
109                }
110                DataOutputStream dataOut = new DataOutputStream(os);
111                ObjectOutputStream objOut = new ObjectOutputStream(dataOut);
112                objOut.writeObject(object);
113                objOut.flush();
114                objOut.reset();
115                objOut.close();
116                setContent(bytesOut.toByteSequence());
117            } catch (IOException ioe) {
118                throw new RuntimeException(ioe.getMessage(), ioe);
119            }
120        }
121    }
122
123    public byte getDataStructureType() {
124        return DATA_STRUCTURE_TYPE;
125    }
126
127    public String getJMSXMimeType() {
128        return "jms/object-message";
129    }
130
131    /**
132     * Clears out the message body. Clearing a message's body does not clear its
133     * header values or property entries. <p/>
134     * <P>
135     * If this message body was read-only, calling this method leaves the
136     * message body in the same state as an empty body in a newly created
137     * message.
138     *
139     * @throws JMSException if the JMS provider fails to clear the message body
140     *                 due to some internal error.
141     */
142
143    public void clearBody() throws JMSException {
144        super.clearBody();
145        this.object = null;
146    }
147
148    /**
149     * Sets the serializable object containing this message's data. It is
150     * important to note that an <CODE>ObjectMessage</CODE> contains a
151     * snapshot of the object at the time <CODE>setObject()</CODE> is called;
152     * subsequent modifications of the object will have no effect on the
153     * <CODE>ObjectMessage</CODE> body.
154     *
155     * @param newObject the message's data
156     * @throws JMSException if the JMS provider fails to set the object due to
157     *                 some internal error.
158     * @throws javax.jms.MessageFormatException if object serialization fails.
159     * @throws javax.jms.MessageNotWriteableException if the message is in
160     *                 read-only mode.
161     */
162
163    public void setObject(Serializable newObject) throws JMSException {
164        checkReadOnlyBody();
165        this.object = newObject;
166        setContent(null);
167        ActiveMQConnection connection = getConnection();
168        if (connection == null || !connection.isObjectMessageSerializationDefered()) {
169            storeContent();
170        }
171    }
172
173    /**
174     * Gets the serializable object containing this message's data. The default
175     * value is null.
176     *
177     * @return the serializable object containing this message's data
178     * @throws JMSException
179     */
180    public Serializable getObject() throws JMSException {
181        if (object == null && getContent() != null) {
182            try {
183                ByteSequence content = getContent();
184                InputStream is = new ByteArrayInputStream(content);
185                if (isCompressed()) {
186                    is = new InflaterInputStream(is);
187                }
188                DataInputStream dataIn = new DataInputStream(is);
189                ClassLoadingAwareObjectInputStream objIn = new ClassLoadingAwareObjectInputStream(dataIn);
190                try {
191                    object = (Serializable)objIn.readObject();
192                } catch (ClassNotFoundException ce) {
193                    throw JMSExceptionSupport.create("Failed to build body from content. Serializable class not available to broker. Reason: " + ce, ce);
194                } finally {
195                    dataIn.close();
196                }
197            } catch (IOException e) {
198                throw JMSExceptionSupport.create("Failed to build body from bytes. Reason: " + e, e);
199            }
200        }
201        return this.object;
202    }
203
204    @Override
205    public void beforeMarshall(WireFormat wireFormat) throws IOException {
206        super.beforeMarshall(wireFormat);
207        // may have initiated on vm transport with deferred marshalling
208        storeContent();
209    }
210
211    public void clearMarshalledState() throws JMSException {
212        super.clearMarshalledState();
213        this.object = null;
214    }
215
216    public void onMessageRolledBack() {
217        super.onMessageRolledBack();
218
219        // lets force the object to be deserialized again - as we could have
220        // changed the object
221        object = null;
222    }
223
224    @Override
225    public void compress() throws IOException {
226        storeContent();
227        super.compress();
228    }
229
230    public String toString() {
231        try {
232            getObject();
233        } catch (JMSException e) {
234        }
235        return super.toString();
236    }
237}