001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.openwire;
018
019import java.io.DataInput;
020import java.io.DataOutput;
021import java.io.IOException;
022import java.lang.reflect.Method;
023import java.util.HashMap;
024import java.util.Map;
025
026import org.apache.activemq.command.CommandTypes;
027import org.apache.activemq.command.DataStructure;
028import org.apache.activemq.command.WireFormatInfo;
029import org.apache.activemq.util.ByteSequence;
030import org.apache.activemq.util.ByteSequenceData;
031import org.apache.activemq.util.DataByteArrayInputStream;
032import org.apache.activemq.util.DataByteArrayOutputStream;
033import org.apache.activemq.wireformat.WireFormat;
034
035/**
036 * 
037 * 
038 */
039public final class OpenWireFormat implements WireFormat {
040
041    public static final int DEFAULT_VERSION = CommandTypes.PROTOCOL_STORE_VERSION;
042    public static final int DEFAULT_WIRE_VERSION = CommandTypes.PROTOCOL_VERSION;
043    public static final long DEFAULT_MAX_FRAME_SIZE = Long.MAX_VALUE;
044
045    static final byte NULL_TYPE = CommandTypes.NULL;
046    private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE / 2;
047    private static final int MARSHAL_CACHE_FREE_SPACE = 100;
048
049    private DataStreamMarshaller dataMarshallers[];
050    private int version;
051    private boolean stackTraceEnabled;
052    private boolean tcpNoDelayEnabled;
053    private boolean cacheEnabled;
054    private boolean tightEncodingEnabled;
055    private boolean sizePrefixDisabled;
056    private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
057
058    // The following fields are used for value caching
059    private short nextMarshallCacheIndex;
060    private short nextMarshallCacheEvictionIndex;
061    private Map<DataStructure, Short> marshallCacheMap = new HashMap<DataStructure, Short>();
062    private DataStructure marshallCache[] = null;
063    private DataStructure unmarshallCache[] = null;
064    private DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream();
065    private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream();
066    private WireFormatInfo preferedWireFormatInfo;
067    
068    public OpenWireFormat() {
069        this(DEFAULT_VERSION);
070    }
071
072    public OpenWireFormat(int i) {
073        setVersion(i);
074    }
075
076    public int hashCode() {
077        return version ^ (cacheEnabled ? 0x10000000 : 0x20000000)
078               ^ (stackTraceEnabled ? 0x01000000 : 0x02000000)
079               ^ (tightEncodingEnabled ? 0x00100000 : 0x00200000)
080               ^ (sizePrefixDisabled ? 0x00010000 : 0x00020000);
081    }
082
083    public OpenWireFormat copy() {
084        OpenWireFormat answer = new OpenWireFormat(version);
085        answer.stackTraceEnabled = stackTraceEnabled;
086        answer.tcpNoDelayEnabled = tcpNoDelayEnabled;
087        answer.cacheEnabled = cacheEnabled;
088        answer.tightEncodingEnabled = tightEncodingEnabled;
089        answer.sizePrefixDisabled = sizePrefixDisabled;
090        answer.preferedWireFormatInfo = preferedWireFormatInfo;
091        return answer;
092    }
093
094    public boolean equals(Object object) {
095        if (object == null) {
096            return false;
097        }
098        OpenWireFormat o = (OpenWireFormat)object;
099        return o.stackTraceEnabled == stackTraceEnabled && o.cacheEnabled == cacheEnabled
100               && o.version == version && o.tightEncodingEnabled == tightEncodingEnabled
101               && o.sizePrefixDisabled == sizePrefixDisabled;
102    }
103
104
105    public String toString() {
106        return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled="
107               + tightEncodingEnabled + ", sizePrefixDisabled=" + sizePrefixDisabled +  ", maxFrameSize=" + maxFrameSize + "}";
108        // return "OpenWireFormat{id="+id+",
109        // tightEncodingEnabled="+tightEncodingEnabled+"}";
110    }
111
112    public int getVersion() {
113        return version;
114    }
115
116    public synchronized ByteSequence marshal(Object command) throws IOException {
117
118        if (cacheEnabled) {
119            runMarshallCacheEvictionSweep();
120        }
121
122        ByteSequence sequence = null;
123        int size = 1;
124        if (command != null) {
125
126            DataStructure c = (DataStructure)command;
127            byte type = c.getDataStructureType();
128            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
129            if (dsm == null) {
130                throw new IOException("Unknown data type: " + type);
131            }
132            if (tightEncodingEnabled) {
133
134                BooleanStream bs = new BooleanStream();
135                size += dsm.tightMarshal1(this, c, bs);
136                size += bs.marshalledSize();
137
138                bytesOut.restart(size);
139                if (!sizePrefixDisabled) {
140                    bytesOut.writeInt(size);
141                }
142                bytesOut.writeByte(type);
143                bs.marshal(bytesOut);
144                dsm.tightMarshal2(this, c, bytesOut, bs);
145                sequence = bytesOut.toByteSequence();
146
147            } else {
148                bytesOut.restart();
149                if (!sizePrefixDisabled) {
150                    bytesOut.writeInt(0); // we don't know the final size
151                    // yet but write this here for
152                    // now.
153                }
154                bytesOut.writeByte(type);
155                dsm.looseMarshal(this, c, bytesOut);
156                sequence = bytesOut.toByteSequence();
157
158                if (!sizePrefixDisabled) {
159                    size = sequence.getLength() - 4;
160                    int pos = sequence.offset;
161                    ByteSequenceData.writeIntBig(sequence, size);
162                    sequence.offset = pos;
163                }
164            }
165
166        } else {
167            bytesOut.restart(5);
168            bytesOut.writeInt(size);
169            bytesOut.writeByte(NULL_TYPE);
170            sequence = bytesOut.toByteSequence();
171        }
172
173        return sequence;
174    }
175
176    public synchronized Object unmarshal(ByteSequence sequence) throws IOException {
177        bytesIn.restart(sequence);
178        // DataInputStream dis = new DataInputStream(new
179        // ByteArrayInputStream(sequence));
180
181        if (!sizePrefixDisabled) {
182            int size = bytesIn.readInt();
183            if (sequence.getLength() - 4 != size) {
184                // throw new IOException("Packet size does not match marshaled
185                // size");
186            }
187
188            if (size > maxFrameSize) {
189                throw new IOException("Frame size of " + (size / (1024 * 1024)) + " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB");
190            }
191        }
192
193        Object command = doUnmarshal(bytesIn);
194        // if( !cacheEnabled && ((DataStructure)command).isMarshallAware() ) {
195        // ((MarshallAware) command).setCachedMarshalledForm(this, sequence);
196        // }
197        return command;
198    }
199
200    public synchronized void marshal(Object o, DataOutput dataOut) throws IOException {
201
202        if (cacheEnabled) {
203            runMarshallCacheEvictionSweep();
204        }
205
206        int size = 1;
207        if (o != null) {
208
209            DataStructure c = (DataStructure)o;
210            byte type = c.getDataStructureType();
211            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
212            if (dsm == null) {
213                throw new IOException("Unknown data type: " + type);
214            }
215            if (tightEncodingEnabled) {
216                BooleanStream bs = new BooleanStream();
217                size += dsm.tightMarshal1(this, c, bs);
218                size += bs.marshalledSize();
219
220                if (!sizePrefixDisabled) {
221                    dataOut.writeInt(size);
222                }
223
224                dataOut.writeByte(type);
225                bs.marshal(dataOut);
226                dsm.tightMarshal2(this, c, dataOut, bs);
227
228            } else {
229                DataOutput looseOut = dataOut;
230
231                if (!sizePrefixDisabled) {
232                    bytesOut.restart();
233                    looseOut = bytesOut;
234                }
235
236                looseOut.writeByte(type);
237                dsm.looseMarshal(this, c, looseOut);
238
239                if (!sizePrefixDisabled) {
240                    ByteSequence sequence = bytesOut.toByteSequence();
241                    dataOut.writeInt(sequence.getLength());
242                    dataOut.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
243                }
244
245            }
246
247        } else {
248            if (!sizePrefixDisabled) {
249                dataOut.writeInt(size);
250            }
251            dataOut.writeByte(NULL_TYPE);
252        }
253    }
254
255    public Object unmarshal(DataInput dis) throws IOException {
256        DataInput dataIn = dis;
257        if (!sizePrefixDisabled) {
258            int size = dis.readInt();
259            if (size > maxFrameSize) {
260                throw new IOException("Frame size of " + (size / (1024 * 1024)) + " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB");
261            }
262            // int size = dis.readInt();
263            // byte[] data = new byte[size];
264            // dis.readFully(data);
265            // bytesIn.restart(data);
266            // dataIn = bytesIn;
267        }
268        return doUnmarshal(dataIn);
269    }
270
271    /**
272     * Used by NIO or AIO transports
273     */
274    public int tightMarshal1(Object o, BooleanStream bs) throws IOException {
275        int size = 1;
276        if (o != null) {
277            DataStructure c = (DataStructure)o;
278            byte type = c.getDataStructureType();
279            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
280            if (dsm == null) {
281                throw new IOException("Unknown data type: " + type);
282            }
283
284            size += dsm.tightMarshal1(this, c, bs);
285            size += bs.marshalledSize();
286        }
287        return size;
288    }
289
290    /**
291     * Used by NIO or AIO transports; note that the size is not written as part
292     * of this method.
293     */
294    public void tightMarshal2(Object o, DataOutput ds, BooleanStream bs) throws IOException {
295        if (cacheEnabled) {
296            runMarshallCacheEvictionSweep();
297        }
298
299        if (o != null) {
300            DataStructure c = (DataStructure)o;
301            byte type = c.getDataStructureType();
302            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
303            if (dsm == null) {
304                throw new IOException("Unknown data type: " + type);
305            }
306            ds.writeByte(type);
307            bs.marshal(ds);
308            dsm.tightMarshal2(this, c, ds, bs);
309        }
310    }
311
312    /**
313     * Allows you to dynamically switch the version of the openwire protocol
314     * being used.
315     * 
316     * @param version
317     */
318    public void setVersion(int version) {
319        String mfName = "org.apache.activemq.openwire.v" + version + ".MarshallerFactory";
320        Class mfClass;
321        try {
322            mfClass = Class.forName(mfName, false, getClass().getClassLoader());
323        } catch (ClassNotFoundException e) {
324            throw (IllegalArgumentException)new IllegalArgumentException("Invalid version: " + version
325                                                                         + ", could not load " + mfName)
326                .initCause(e);
327        }
328        try {
329            Method method = mfClass.getMethod("createMarshallerMap", new Class[] {OpenWireFormat.class});
330            dataMarshallers = (DataStreamMarshaller[])method.invoke(null, new Object[] {this});
331        } catch (Throwable e) {
332            throw (IllegalArgumentException)new IllegalArgumentException(
333                                                                         "Invalid version: "
334                                                                             + version
335                                                                             + ", "
336                                                                             + mfName
337                                                                             + " does not properly implement the createMarshallerMap method.")
338                .initCause(e);
339        }
340        this.version = version;
341    }
342
343    public Object doUnmarshal(DataInput dis) throws IOException {
344        byte dataType = dis.readByte();
345        if (dataType != NULL_TYPE) {
346            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
347            if (dsm == null) {
348                throw new IOException("Unknown data type: " + dataType);
349            }
350            Object data = dsm.createObject();
351            if (this.tightEncodingEnabled) {
352                BooleanStream bs = new BooleanStream();
353                bs.unmarshal(dis);
354                dsm.tightUnmarshal(this, data, dis, bs);
355            } else {
356                dsm.looseUnmarshal(this, data, dis);
357            }
358            return data;
359        } else {
360            return null;
361        }
362    }
363
364    // public void debug(String msg) {
365    // String t = (Thread.currentThread().getName()+" ").substring(0, 40);
366    // System.out.println(t+": "+msg);
367    // }
368    public int tightMarshalNestedObject1(DataStructure o, BooleanStream bs) throws IOException {
369        bs.writeBoolean(o != null);
370        if (o == null) {
371            return 0;
372        }
373
374        if (o.isMarshallAware()) {
375            // MarshallAware ma = (MarshallAware)o;
376            ByteSequence sequence = null;
377            // sequence=ma.getCachedMarshalledForm(this);
378            bs.writeBoolean(sequence != null);
379            if (sequence != null) {
380                return 1 + sequence.getLength();
381            }
382        }
383
384        byte type = o.getDataStructureType();
385        DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
386        if (dsm == null) {
387            throw new IOException("Unknown data type: " + type);
388        }
389        return 1 + dsm.tightMarshal1(this, o, bs);
390    }
391
392    public void tightMarshalNestedObject2(DataStructure o, DataOutput ds, BooleanStream bs)
393        throws IOException {
394        if (!bs.readBoolean()) {
395            return;
396        }
397
398        byte type = o.getDataStructureType();
399        ds.writeByte(type);
400
401        if (o.isMarshallAware() && bs.readBoolean()) {
402
403            // We should not be doing any caching
404            throw new IOException("Corrupted stream");
405            // MarshallAware ma = (MarshallAware) o;
406            // ByteSequence sequence=ma.getCachedMarshalledForm(this);
407            // ds.write(sequence.getData(), sequence.getOffset(),
408            // sequence.getLength());
409
410        } else {
411
412            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
413            if (dsm == null) {
414                throw new IOException("Unknown data type: " + type);
415            }
416            dsm.tightMarshal2(this, o, ds, bs);
417
418        }
419    }
420
421    public DataStructure tightUnmarshalNestedObject(DataInput dis, BooleanStream bs) throws IOException {
422        if (bs.readBoolean()) {
423
424            byte dataType = dis.readByte();
425            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
426            if (dsm == null) {
427                throw new IOException("Unknown data type: " + dataType);
428            }
429            DataStructure data = dsm.createObject();
430
431            if (data.isMarshallAware() && bs.readBoolean()) {
432
433                dis.readInt();
434                dis.readByte();
435
436                BooleanStream bs2 = new BooleanStream();
437                bs2.unmarshal(dis);
438                dsm.tightUnmarshal(this, data, dis, bs2);
439
440                // TODO: extract the sequence from the dis and associate it.
441                // MarshallAware ma = (MarshallAware)data
442                // ma.setCachedMarshalledForm(this, sequence);
443
444            } else {
445                dsm.tightUnmarshal(this, data, dis, bs);
446            }
447
448            return data;
449        } else {
450            return null;
451        }
452    }
453
454    public DataStructure looseUnmarshalNestedObject(DataInput dis) throws IOException {
455        if (dis.readBoolean()) {
456
457            byte dataType = dis.readByte();
458            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
459            if (dsm == null) {
460                throw new IOException("Unknown data type: " + dataType);
461            }
462            DataStructure data = dsm.createObject();
463            dsm.looseUnmarshal(this, data, dis);
464            return data;
465
466        } else {
467            return null;
468        }
469    }
470
471    public void looseMarshalNestedObject(DataStructure o, DataOutput dataOut) throws IOException {
472        dataOut.writeBoolean(o != null);
473        if (o != null) {
474            byte type = o.getDataStructureType();
475            dataOut.writeByte(type);
476            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
477            if (dsm == null) {
478                throw new IOException("Unknown data type: " + type);
479            }
480            dsm.looseMarshal(this, o, dataOut);
481        }
482    }
483
484    public void runMarshallCacheEvictionSweep() {
485        // Do we need to start evicting??
486        while (marshallCacheMap.size() > marshallCache.length - MARSHAL_CACHE_FREE_SPACE) {
487
488            marshallCacheMap.remove(marshallCache[nextMarshallCacheEvictionIndex]);
489            marshallCache[nextMarshallCacheEvictionIndex] = null;
490
491            nextMarshallCacheEvictionIndex++;
492            if (nextMarshallCacheEvictionIndex >= marshallCache.length) {
493                nextMarshallCacheEvictionIndex = 0;
494            }
495
496        }
497    }
498
499    public Short getMarshallCacheIndex(DataStructure o) {
500        return marshallCacheMap.get(o);
501    }
502
503    public Short addToMarshallCache(DataStructure o) {
504        short i = nextMarshallCacheIndex++;
505        if (nextMarshallCacheIndex >= marshallCache.length) {
506            nextMarshallCacheIndex = 0;
507        }
508
509        // We can only cache that item if there is space left.
510        if (marshallCacheMap.size() < marshallCache.length) {
511            marshallCache[i] = o;
512            Short index = new Short(i);
513            marshallCacheMap.put(o, index);
514            return index;
515        } else {
516            // Use -1 to indicate that the value was not cached due to cache
517            // being full.
518            return new Short((short)-1);
519        }
520    }
521
522    public void setInUnmarshallCache(short index, DataStructure o) {
523
524        // There was no space left in the cache, so we can't
525        // put this in the cache.
526        if (index == -1) {
527            return;
528        }
529
530        unmarshallCache[index] = o;
531    }
532
533    public DataStructure getFromUnmarshallCache(short index) {
534        return unmarshallCache[index];
535    }
536
537    public void setStackTraceEnabled(boolean b) {
538        stackTraceEnabled = b;
539    }
540
541    public boolean isStackTraceEnabled() {
542        return stackTraceEnabled;
543    }
544
545    public boolean isTcpNoDelayEnabled() {
546        return tcpNoDelayEnabled;
547    }
548
549    public void setTcpNoDelayEnabled(boolean tcpNoDelayEnabled) {
550        this.tcpNoDelayEnabled = tcpNoDelayEnabled;
551    }
552
553    public boolean isCacheEnabled() {
554        return cacheEnabled;
555    }
556
557    public void setCacheEnabled(boolean cacheEnabled) {
558        if(cacheEnabled){
559            marshallCache = new DataStructure[MARSHAL_CACHE_SIZE];
560            unmarshallCache = new DataStructure[MARSHAL_CACHE_SIZE];
561        }
562        this.cacheEnabled = cacheEnabled;
563    }
564
565    public boolean isTightEncodingEnabled() {
566        return tightEncodingEnabled;
567    }
568
569    public void setTightEncodingEnabled(boolean tightEncodingEnabled) {
570        this.tightEncodingEnabled = tightEncodingEnabled;
571    }
572
573    public boolean isSizePrefixDisabled() {
574        return sizePrefixDisabled;
575    }
576
577    public void setSizePrefixDisabled(boolean prefixPacketSize) {
578        this.sizePrefixDisabled = prefixPacketSize;
579    }
580
581    public void setPreferedWireFormatInfo(WireFormatInfo info) {
582        this.preferedWireFormatInfo = info;
583    }
584
585    public WireFormatInfo getPreferedWireFormatInfo() {
586        return preferedWireFormatInfo;
587    }
588
589    public long getMaxFrameSize() {
590        return maxFrameSize;
591    }
592
593    public void setMaxFrameSize(long maxFrameSize) {
594        this.maxFrameSize = maxFrameSize;
595    }
596
597    public void renegotiateWireFormat(WireFormatInfo info) throws IOException {
598
599        if (preferedWireFormatInfo == null) {
600            throw new IllegalStateException("Wireformat cannot not be renegotiated.");
601        }
602
603        this.setVersion(min(preferedWireFormatInfo.getVersion(), info.getVersion()));
604        info.setVersion(this.getVersion());
605
606        this.setMaxFrameSize(min(preferedWireFormatInfo.getMaxFrameSize(), info.getMaxFrameSize()));
607        info.setMaxFrameSize(this.getMaxFrameSize());
608
609        this.stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo.isStackTraceEnabled();
610        info.setStackTraceEnabled(this.stackTraceEnabled);
611
612        this.tcpNoDelayEnabled = info.isTcpNoDelayEnabled() && preferedWireFormatInfo.isTcpNoDelayEnabled();
613        info.setTcpNoDelayEnabled(this.tcpNoDelayEnabled);
614
615        this.cacheEnabled = info.isCacheEnabled() && preferedWireFormatInfo.isCacheEnabled();
616        info.setCacheEnabled(this.cacheEnabled);
617
618        this.tightEncodingEnabled = info.isTightEncodingEnabled()
619                                    && preferedWireFormatInfo.isTightEncodingEnabled();
620        info.setTightEncodingEnabled(this.tightEncodingEnabled);
621
622        this.sizePrefixDisabled = info.isSizePrefixDisabled()
623                                  && preferedWireFormatInfo.isSizePrefixDisabled();
624        info.setSizePrefixDisabled(this.sizePrefixDisabled);
625
626        if (cacheEnabled) {
627
628            int size = Math.min(preferedWireFormatInfo.getCacheSize(), info.getCacheSize());
629            info.setCacheSize(size);
630
631            if (size == 0) {
632                size = MARSHAL_CACHE_SIZE;
633            }
634
635            marshallCache = new DataStructure[size];
636            unmarshallCache = new DataStructure[size];
637            nextMarshallCacheIndex = 0;
638            nextMarshallCacheEvictionIndex = 0;
639            marshallCacheMap = new HashMap<DataStructure, Short>();
640        } else {
641            marshallCache = null;
642            unmarshallCache = null;
643            nextMarshallCacheIndex = 0;
644            nextMarshallCacheEvictionIndex = 0;
645            marshallCacheMap = null;
646        }
647
648    }
649
650    protected int min(int version1, int version2) {
651        if (version1 < version2 && version1 > 0 || version2 <= 0) {
652            return version1;
653        }
654        return version2;
655    }
656
657    protected long min(long version1, long version2) {
658        if (version1 < version2 && version1 > 0 || version2 <= 0) {
659            return version1;
660        }
661        return version2;
662    }
663}