001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.openwire; 018 019import java.io.DataInput; 020import java.io.DataOutput; 021import java.io.IOException; 022import java.lang.reflect.Method; 023import java.util.HashMap; 024import java.util.Map; 025 026import org.apache.activemq.command.CommandTypes; 027import org.apache.activemq.command.DataStructure; 028import org.apache.activemq.command.WireFormatInfo; 029import org.apache.activemq.util.ByteSequence; 030import org.apache.activemq.util.ByteSequenceData; 031import org.apache.activemq.util.DataByteArrayInputStream; 032import org.apache.activemq.util.DataByteArrayOutputStream; 033import org.apache.activemq.wireformat.WireFormat; 034 035/** 036 * 037 * 038 */ 039public final class OpenWireFormat implements WireFormat { 040 041 public static final int DEFAULT_VERSION = CommandTypes.PROTOCOL_STORE_VERSION; 042 public static final int DEFAULT_WIRE_VERSION = CommandTypes.PROTOCOL_VERSION; 043 public static final long DEFAULT_MAX_FRAME_SIZE = Long.MAX_VALUE; 044 045 static final byte NULL_TYPE = CommandTypes.NULL; 046 private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE / 2; 047 private static final int MARSHAL_CACHE_FREE_SPACE = 100; 048 049 private DataStreamMarshaller dataMarshallers[]; 050 private int version; 051 private boolean stackTraceEnabled; 052 private boolean tcpNoDelayEnabled; 053 private boolean cacheEnabled; 054 private boolean tightEncodingEnabled; 055 private boolean sizePrefixDisabled; 056 private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE; 057 058 // The following fields are used for value caching 059 private short nextMarshallCacheIndex; 060 private short nextMarshallCacheEvictionIndex; 061 private Map<DataStructure, Short> marshallCacheMap = new HashMap<DataStructure, Short>(); 062 private DataStructure marshallCache[] = null; 063 private DataStructure unmarshallCache[] = null; 064 private DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream(); 065 private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream(); 066 private WireFormatInfo preferedWireFormatInfo; 067 068 public OpenWireFormat() { 069 this(DEFAULT_VERSION); 070 } 071 072 public OpenWireFormat(int i) { 073 setVersion(i); 074 } 075 076 public int hashCode() { 077 return version ^ (cacheEnabled ? 0x10000000 : 0x20000000) 078 ^ (stackTraceEnabled ? 0x01000000 : 0x02000000) 079 ^ (tightEncodingEnabled ? 0x00100000 : 0x00200000) 080 ^ (sizePrefixDisabled ? 0x00010000 : 0x00020000); 081 } 082 083 public OpenWireFormat copy() { 084 OpenWireFormat answer = new OpenWireFormat(version); 085 answer.stackTraceEnabled = stackTraceEnabled; 086 answer.tcpNoDelayEnabled = tcpNoDelayEnabled; 087 answer.cacheEnabled = cacheEnabled; 088 answer.tightEncodingEnabled = tightEncodingEnabled; 089 answer.sizePrefixDisabled = sizePrefixDisabled; 090 answer.preferedWireFormatInfo = preferedWireFormatInfo; 091 return answer; 092 } 093 094 public boolean equals(Object object) { 095 if (object == null) { 096 return false; 097 } 098 OpenWireFormat o = (OpenWireFormat)object; 099 return o.stackTraceEnabled == stackTraceEnabled && o.cacheEnabled == cacheEnabled 100 && o.version == version && o.tightEncodingEnabled == tightEncodingEnabled 101 && o.sizePrefixDisabled == sizePrefixDisabled; 102 } 103 104 105 public String toString() { 106 return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled=" 107 + tightEncodingEnabled + ", sizePrefixDisabled=" + sizePrefixDisabled + ", maxFrameSize=" + maxFrameSize + "}"; 108 // return "OpenWireFormat{id="+id+", 109 // tightEncodingEnabled="+tightEncodingEnabled+"}"; 110 } 111 112 public int getVersion() { 113 return version; 114 } 115 116 public synchronized ByteSequence marshal(Object command) throws IOException { 117 118 if (cacheEnabled) { 119 runMarshallCacheEvictionSweep(); 120 } 121 122 ByteSequence sequence = null; 123 int size = 1; 124 if (command != null) { 125 126 DataStructure c = (DataStructure)command; 127 byte type = c.getDataStructureType(); 128 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; 129 if (dsm == null) { 130 throw new IOException("Unknown data type: " + type); 131 } 132 if (tightEncodingEnabled) { 133 134 BooleanStream bs = new BooleanStream(); 135 size += dsm.tightMarshal1(this, c, bs); 136 size += bs.marshalledSize(); 137 138 bytesOut.restart(size); 139 if (!sizePrefixDisabled) { 140 bytesOut.writeInt(size); 141 } 142 bytesOut.writeByte(type); 143 bs.marshal(bytesOut); 144 dsm.tightMarshal2(this, c, bytesOut, bs); 145 sequence = bytesOut.toByteSequence(); 146 147 } else { 148 bytesOut.restart(); 149 if (!sizePrefixDisabled) { 150 bytesOut.writeInt(0); // we don't know the final size 151 // yet but write this here for 152 // now. 153 } 154 bytesOut.writeByte(type); 155 dsm.looseMarshal(this, c, bytesOut); 156 sequence = bytesOut.toByteSequence(); 157 158 if (!sizePrefixDisabled) { 159 size = sequence.getLength() - 4; 160 int pos = sequence.offset; 161 ByteSequenceData.writeIntBig(sequence, size); 162 sequence.offset = pos; 163 } 164 } 165 166 } else { 167 bytesOut.restart(5); 168 bytesOut.writeInt(size); 169 bytesOut.writeByte(NULL_TYPE); 170 sequence = bytesOut.toByteSequence(); 171 } 172 173 return sequence; 174 } 175 176 public synchronized Object unmarshal(ByteSequence sequence) throws IOException { 177 bytesIn.restart(sequence); 178 // DataInputStream dis = new DataInputStream(new 179 // ByteArrayInputStream(sequence)); 180 181 if (!sizePrefixDisabled) { 182 int size = bytesIn.readInt(); 183 if (sequence.getLength() - 4 != size) { 184 // throw new IOException("Packet size does not match marshaled 185 // size"); 186 } 187 188 if (size > maxFrameSize) { 189 throw new IOException("Frame size of " + (size / (1024 * 1024)) + " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB"); 190 } 191 } 192 193 Object command = doUnmarshal(bytesIn); 194 // if( !cacheEnabled && ((DataStructure)command).isMarshallAware() ) { 195 // ((MarshallAware) command).setCachedMarshalledForm(this, sequence); 196 // } 197 return command; 198 } 199 200 public synchronized void marshal(Object o, DataOutput dataOut) throws IOException { 201 202 if (cacheEnabled) { 203 runMarshallCacheEvictionSweep(); 204 } 205 206 int size = 1; 207 if (o != null) { 208 209 DataStructure c = (DataStructure)o; 210 byte type = c.getDataStructureType(); 211 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; 212 if (dsm == null) { 213 throw new IOException("Unknown data type: " + type); 214 } 215 if (tightEncodingEnabled) { 216 BooleanStream bs = new BooleanStream(); 217 size += dsm.tightMarshal1(this, c, bs); 218 size += bs.marshalledSize(); 219 220 if (!sizePrefixDisabled) { 221 dataOut.writeInt(size); 222 } 223 224 dataOut.writeByte(type); 225 bs.marshal(dataOut); 226 dsm.tightMarshal2(this, c, dataOut, bs); 227 228 } else { 229 DataOutput looseOut = dataOut; 230 231 if (!sizePrefixDisabled) { 232 bytesOut.restart(); 233 looseOut = bytesOut; 234 } 235 236 looseOut.writeByte(type); 237 dsm.looseMarshal(this, c, looseOut); 238 239 if (!sizePrefixDisabled) { 240 ByteSequence sequence = bytesOut.toByteSequence(); 241 dataOut.writeInt(sequence.getLength()); 242 dataOut.write(sequence.getData(), sequence.getOffset(), sequence.getLength()); 243 } 244 245 } 246 247 } else { 248 if (!sizePrefixDisabled) { 249 dataOut.writeInt(size); 250 } 251 dataOut.writeByte(NULL_TYPE); 252 } 253 } 254 255 public Object unmarshal(DataInput dis) throws IOException { 256 DataInput dataIn = dis; 257 if (!sizePrefixDisabled) { 258 int size = dis.readInt(); 259 if (size > maxFrameSize) { 260 throw new IOException("Frame size of " + (size / (1024 * 1024)) + " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB"); 261 } 262 // int size = dis.readInt(); 263 // byte[] data = new byte[size]; 264 // dis.readFully(data); 265 // bytesIn.restart(data); 266 // dataIn = bytesIn; 267 } 268 return doUnmarshal(dataIn); 269 } 270 271 /** 272 * Used by NIO or AIO transports 273 */ 274 public int tightMarshal1(Object o, BooleanStream bs) throws IOException { 275 int size = 1; 276 if (o != null) { 277 DataStructure c = (DataStructure)o; 278 byte type = c.getDataStructureType(); 279 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; 280 if (dsm == null) { 281 throw new IOException("Unknown data type: " + type); 282 } 283 284 size += dsm.tightMarshal1(this, c, bs); 285 size += bs.marshalledSize(); 286 } 287 return size; 288 } 289 290 /** 291 * Used by NIO or AIO transports; note that the size is not written as part 292 * of this method. 293 */ 294 public void tightMarshal2(Object o, DataOutput ds, BooleanStream bs) throws IOException { 295 if (cacheEnabled) { 296 runMarshallCacheEvictionSweep(); 297 } 298 299 if (o != null) { 300 DataStructure c = (DataStructure)o; 301 byte type = c.getDataStructureType(); 302 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; 303 if (dsm == null) { 304 throw new IOException("Unknown data type: " + type); 305 } 306 ds.writeByte(type); 307 bs.marshal(ds); 308 dsm.tightMarshal2(this, c, ds, bs); 309 } 310 } 311 312 /** 313 * Allows you to dynamically switch the version of the openwire protocol 314 * being used. 315 * 316 * @param version 317 */ 318 public void setVersion(int version) { 319 String mfName = "org.apache.activemq.openwire.v" + version + ".MarshallerFactory"; 320 Class mfClass; 321 try { 322 mfClass = Class.forName(mfName, false, getClass().getClassLoader()); 323 } catch (ClassNotFoundException e) { 324 throw (IllegalArgumentException)new IllegalArgumentException("Invalid version: " + version 325 + ", could not load " + mfName) 326 .initCause(e); 327 } 328 try { 329 Method method = mfClass.getMethod("createMarshallerMap", new Class[] {OpenWireFormat.class}); 330 dataMarshallers = (DataStreamMarshaller[])method.invoke(null, new Object[] {this}); 331 } catch (Throwable e) { 332 throw (IllegalArgumentException)new IllegalArgumentException( 333 "Invalid version: " 334 + version 335 + ", " 336 + mfName 337 + " does not properly implement the createMarshallerMap method.") 338 .initCause(e); 339 } 340 this.version = version; 341 } 342 343 public Object doUnmarshal(DataInput dis) throws IOException { 344 byte dataType = dis.readByte(); 345 if (dataType != NULL_TYPE) { 346 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF]; 347 if (dsm == null) { 348 throw new IOException("Unknown data type: " + dataType); 349 } 350 Object data = dsm.createObject(); 351 if (this.tightEncodingEnabled) { 352 BooleanStream bs = new BooleanStream(); 353 bs.unmarshal(dis); 354 dsm.tightUnmarshal(this, data, dis, bs); 355 } else { 356 dsm.looseUnmarshal(this, data, dis); 357 } 358 return data; 359 } else { 360 return null; 361 } 362 } 363 364 // public void debug(String msg) { 365 // String t = (Thread.currentThread().getName()+" ").substring(0, 40); 366 // System.out.println(t+": "+msg); 367 // } 368 public int tightMarshalNestedObject1(DataStructure o, BooleanStream bs) throws IOException { 369 bs.writeBoolean(o != null); 370 if (o == null) { 371 return 0; 372 } 373 374 if (o.isMarshallAware()) { 375 // MarshallAware ma = (MarshallAware)o; 376 ByteSequence sequence = null; 377 // sequence=ma.getCachedMarshalledForm(this); 378 bs.writeBoolean(sequence != null); 379 if (sequence != null) { 380 return 1 + sequence.getLength(); 381 } 382 } 383 384 byte type = o.getDataStructureType(); 385 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; 386 if (dsm == null) { 387 throw new IOException("Unknown data type: " + type); 388 } 389 return 1 + dsm.tightMarshal1(this, o, bs); 390 } 391 392 public void tightMarshalNestedObject2(DataStructure o, DataOutput ds, BooleanStream bs) 393 throws IOException { 394 if (!bs.readBoolean()) { 395 return; 396 } 397 398 byte type = o.getDataStructureType(); 399 ds.writeByte(type); 400 401 if (o.isMarshallAware() && bs.readBoolean()) { 402 403 // We should not be doing any caching 404 throw new IOException("Corrupted stream"); 405 // MarshallAware ma = (MarshallAware) o; 406 // ByteSequence sequence=ma.getCachedMarshalledForm(this); 407 // ds.write(sequence.getData(), sequence.getOffset(), 408 // sequence.getLength()); 409 410 } else { 411 412 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; 413 if (dsm == null) { 414 throw new IOException("Unknown data type: " + type); 415 } 416 dsm.tightMarshal2(this, o, ds, bs); 417 418 } 419 } 420 421 public DataStructure tightUnmarshalNestedObject(DataInput dis, BooleanStream bs) throws IOException { 422 if (bs.readBoolean()) { 423 424 byte dataType = dis.readByte(); 425 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF]; 426 if (dsm == null) { 427 throw new IOException("Unknown data type: " + dataType); 428 } 429 DataStructure data = dsm.createObject(); 430 431 if (data.isMarshallAware() && bs.readBoolean()) { 432 433 dis.readInt(); 434 dis.readByte(); 435 436 BooleanStream bs2 = new BooleanStream(); 437 bs2.unmarshal(dis); 438 dsm.tightUnmarshal(this, data, dis, bs2); 439 440 // TODO: extract the sequence from the dis and associate it. 441 // MarshallAware ma = (MarshallAware)data 442 // ma.setCachedMarshalledForm(this, sequence); 443 444 } else { 445 dsm.tightUnmarshal(this, data, dis, bs); 446 } 447 448 return data; 449 } else { 450 return null; 451 } 452 } 453 454 public DataStructure looseUnmarshalNestedObject(DataInput dis) throws IOException { 455 if (dis.readBoolean()) { 456 457 byte dataType = dis.readByte(); 458 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF]; 459 if (dsm == null) { 460 throw new IOException("Unknown data type: " + dataType); 461 } 462 DataStructure data = dsm.createObject(); 463 dsm.looseUnmarshal(this, data, dis); 464 return data; 465 466 } else { 467 return null; 468 } 469 } 470 471 public void looseMarshalNestedObject(DataStructure o, DataOutput dataOut) throws IOException { 472 dataOut.writeBoolean(o != null); 473 if (o != null) { 474 byte type = o.getDataStructureType(); 475 dataOut.writeByte(type); 476 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; 477 if (dsm == null) { 478 throw new IOException("Unknown data type: " + type); 479 } 480 dsm.looseMarshal(this, o, dataOut); 481 } 482 } 483 484 public void runMarshallCacheEvictionSweep() { 485 // Do we need to start evicting?? 486 while (marshallCacheMap.size() > marshallCache.length - MARSHAL_CACHE_FREE_SPACE) { 487 488 marshallCacheMap.remove(marshallCache[nextMarshallCacheEvictionIndex]); 489 marshallCache[nextMarshallCacheEvictionIndex] = null; 490 491 nextMarshallCacheEvictionIndex++; 492 if (nextMarshallCacheEvictionIndex >= marshallCache.length) { 493 nextMarshallCacheEvictionIndex = 0; 494 } 495 496 } 497 } 498 499 public Short getMarshallCacheIndex(DataStructure o) { 500 return marshallCacheMap.get(o); 501 } 502 503 public Short addToMarshallCache(DataStructure o) { 504 short i = nextMarshallCacheIndex++; 505 if (nextMarshallCacheIndex >= marshallCache.length) { 506 nextMarshallCacheIndex = 0; 507 } 508 509 // We can only cache that item if there is space left. 510 if (marshallCacheMap.size() < marshallCache.length) { 511 marshallCache[i] = o; 512 Short index = new Short(i); 513 marshallCacheMap.put(o, index); 514 return index; 515 } else { 516 // Use -1 to indicate that the value was not cached due to cache 517 // being full. 518 return new Short((short)-1); 519 } 520 } 521 522 public void setInUnmarshallCache(short index, DataStructure o) { 523 524 // There was no space left in the cache, so we can't 525 // put this in the cache. 526 if (index == -1) { 527 return; 528 } 529 530 unmarshallCache[index] = o; 531 } 532 533 public DataStructure getFromUnmarshallCache(short index) { 534 return unmarshallCache[index]; 535 } 536 537 public void setStackTraceEnabled(boolean b) { 538 stackTraceEnabled = b; 539 } 540 541 public boolean isStackTraceEnabled() { 542 return stackTraceEnabled; 543 } 544 545 public boolean isTcpNoDelayEnabled() { 546 return tcpNoDelayEnabled; 547 } 548 549 public void setTcpNoDelayEnabled(boolean tcpNoDelayEnabled) { 550 this.tcpNoDelayEnabled = tcpNoDelayEnabled; 551 } 552 553 public boolean isCacheEnabled() { 554 return cacheEnabled; 555 } 556 557 public void setCacheEnabled(boolean cacheEnabled) { 558 if(cacheEnabled){ 559 marshallCache = new DataStructure[MARSHAL_CACHE_SIZE]; 560 unmarshallCache = new DataStructure[MARSHAL_CACHE_SIZE]; 561 } 562 this.cacheEnabled = cacheEnabled; 563 } 564 565 public boolean isTightEncodingEnabled() { 566 return tightEncodingEnabled; 567 } 568 569 public void setTightEncodingEnabled(boolean tightEncodingEnabled) { 570 this.tightEncodingEnabled = tightEncodingEnabled; 571 } 572 573 public boolean isSizePrefixDisabled() { 574 return sizePrefixDisabled; 575 } 576 577 public void setSizePrefixDisabled(boolean prefixPacketSize) { 578 this.sizePrefixDisabled = prefixPacketSize; 579 } 580 581 public void setPreferedWireFormatInfo(WireFormatInfo info) { 582 this.preferedWireFormatInfo = info; 583 } 584 585 public WireFormatInfo getPreferedWireFormatInfo() { 586 return preferedWireFormatInfo; 587 } 588 589 public long getMaxFrameSize() { 590 return maxFrameSize; 591 } 592 593 public void setMaxFrameSize(long maxFrameSize) { 594 this.maxFrameSize = maxFrameSize; 595 } 596 597 public void renegotiateWireFormat(WireFormatInfo info) throws IOException { 598 599 if (preferedWireFormatInfo == null) { 600 throw new IllegalStateException("Wireformat cannot not be renegotiated."); 601 } 602 603 this.setVersion(min(preferedWireFormatInfo.getVersion(), info.getVersion())); 604 info.setVersion(this.getVersion()); 605 606 this.setMaxFrameSize(min(preferedWireFormatInfo.getMaxFrameSize(), info.getMaxFrameSize())); 607 info.setMaxFrameSize(this.getMaxFrameSize()); 608 609 this.stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo.isStackTraceEnabled(); 610 info.setStackTraceEnabled(this.stackTraceEnabled); 611 612 this.tcpNoDelayEnabled = info.isTcpNoDelayEnabled() && preferedWireFormatInfo.isTcpNoDelayEnabled(); 613 info.setTcpNoDelayEnabled(this.tcpNoDelayEnabled); 614 615 this.cacheEnabled = info.isCacheEnabled() && preferedWireFormatInfo.isCacheEnabled(); 616 info.setCacheEnabled(this.cacheEnabled); 617 618 this.tightEncodingEnabled = info.isTightEncodingEnabled() 619 && preferedWireFormatInfo.isTightEncodingEnabled(); 620 info.setTightEncodingEnabled(this.tightEncodingEnabled); 621 622 this.sizePrefixDisabled = info.isSizePrefixDisabled() 623 && preferedWireFormatInfo.isSizePrefixDisabled(); 624 info.setSizePrefixDisabled(this.sizePrefixDisabled); 625 626 if (cacheEnabled) { 627 628 int size = Math.min(preferedWireFormatInfo.getCacheSize(), info.getCacheSize()); 629 info.setCacheSize(size); 630 631 if (size == 0) { 632 size = MARSHAL_CACHE_SIZE; 633 } 634 635 marshallCache = new DataStructure[size]; 636 unmarshallCache = new DataStructure[size]; 637 nextMarshallCacheIndex = 0; 638 nextMarshallCacheEvictionIndex = 0; 639 marshallCacheMap = new HashMap<DataStructure, Short>(); 640 } else { 641 marshallCache = null; 642 unmarshallCache = null; 643 nextMarshallCacheIndex = 0; 644 nextMarshallCacheEvictionIndex = 0; 645 marshallCacheMap = null; 646 } 647 648 } 649 650 protected int min(int version1, int version2) { 651 if (version1 < version2 && version1 > 0 || version2 <= 0) { 652 return version1; 653 } 654 return version2; 655 } 656 657 protected long min(long version1, long version2) { 658 if (version1 < version2 && version1 > 0 || version2 <= 0) { 659 return version1; 660 } 661 return version2; 662 } 663}