001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.amqp; 018 019import java.io.IOException; 020import java.nio.ByteBuffer; 021 022import org.apache.activemq.transport.amqp.AmqpWireFormat.ResetListener; 023import org.apache.activemq.transport.tcp.TcpTransport; 024import org.fusesource.hawtbuf.Buffer; 025import org.slf4j.Logger; 026import org.slf4j.LoggerFactory; 027 028/** 029 * State based Frame reader that is used in the NIO based transports where 030 * AMQP frames can come in in partial or overlapping forms. 031 */ 032public class AmqpFrameParser { 033 034 private static final Logger LOG = LoggerFactory.getLogger(AmqpFrameParser.class); 035 036 public interface AMQPFrameSink { 037 void onFrame(Object frame); 038 } 039 040 private static final byte AMQP_FRAME_SIZE_BYTES = 4; 041 private static final byte AMQP_HEADER_BYTES = 8; 042 043 private final AMQPFrameSink frameSink; 044 045 private FrameParser currentParser; 046 private AmqpWireFormat wireFormat; 047 048 public AmqpFrameParser(AMQPFrameSink sink) { 049 this.frameSink = sink; 050 } 051 052 public AmqpFrameParser(final TcpTransport transport) { 053 this.frameSink = new AMQPFrameSink() { 054 055 @Override 056 public void onFrame(Object frame) { 057 transport.doConsume(frame); 058 } 059 }; 060 } 061 062 public void parse(ByteBuffer incoming) throws Exception { 063 064 if (incoming == null || !incoming.hasRemaining()) { 065 return; 066 } 067 068 if (currentParser == null) { 069 currentParser = initializeHeaderParser(); 070 } 071 072 // Parser stack will run until current incoming data has all been consumed. 073 currentParser.parse(incoming); 074 } 075 076 public void reset() { 077 currentParser = initializeHeaderParser(); 078 } 079 080 private void validateFrameSize(int frameSize) throws IOException { 081 long maxFrameSize = AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE; 082 if (wireFormat != null) { 083 maxFrameSize = wireFormat.getMaxFrameSize(); 084 } 085 086 if (frameSize > maxFrameSize) { 087 throw new IOException("Frame size of " + frameSize + " larger than max allowed " + maxFrameSize); 088 } 089 } 090 091 public void setWireFormat(AmqpWireFormat wireFormat) { 092 this.wireFormat = wireFormat; 093 if (wireFormat != null) { 094 wireFormat.setProtocolResetListener(new ResetListener() { 095 096 @Override 097 public void onProtocolReset() { 098 reset(); 099 } 100 }); 101 } 102 } 103 104 public AmqpWireFormat getWireFormat() { 105 return this.wireFormat; 106 } 107 108 //----- Prepare the current frame parser for use -------------------------// 109 110 private FrameParser initializeHeaderParser() { 111 headerReader.reset(AMQP_HEADER_BYTES); 112 return headerReader; 113 } 114 115 private FrameParser initializeFrameLengthParser() { 116 frameSizeReader.reset(AMQP_FRAME_SIZE_BYTES); 117 return frameSizeReader; 118 } 119 120 private FrameParser initializeContentReader(int contentLength) { 121 contentReader.reset(contentLength); 122 return contentReader; 123 } 124 125 //----- Frame parser implementations -------------------------------------// 126 127 private interface FrameParser { 128 129 void parse(ByteBuffer incoming) throws IOException; 130 131 void reset(int nextExpectedReadSize); 132 } 133 134 private final FrameParser headerReader = new FrameParser() { 135 136 private final Buffer header = new Buffer(AMQP_HEADER_BYTES); 137 138 @Override 139 public void parse(ByteBuffer incoming) throws IOException { 140 int length = Math.min(incoming.remaining(), header.length - header.offset); 141 142 incoming.get(header.data, header.offset, length); 143 header.offset += length; 144 145 if (header.offset == AMQP_HEADER_BYTES) { 146 header.reset(); 147 AmqpHeader amqpHeader = new AmqpHeader(header.deepCopy(), false); 148 currentParser = initializeFrameLengthParser(); 149 frameSink.onFrame(amqpHeader); 150 if (incoming.hasRemaining()) { 151 currentParser.parse(incoming); 152 } 153 } 154 } 155 156 @Override 157 public void reset(int nextExpectedReadSize) { 158 header.reset(); 159 } 160 }; 161 162 private final FrameParser frameSizeReader = new FrameParser() { 163 164 private int frameSize; 165 private int multiplier; 166 167 @Override 168 public void parse(ByteBuffer incoming) throws IOException { 169 170 while (incoming.hasRemaining()) { 171 frameSize += ((incoming.get() & 0xFF) << --multiplier * Byte.SIZE); 172 173 if (multiplier == 0) { 174 LOG.trace("Next incoming frame length: {}", frameSize); 175 validateFrameSize(frameSize); 176 currentParser = initializeContentReader(frameSize); 177 if (incoming.hasRemaining()) { 178 currentParser.parse(incoming); 179 return; 180 } 181 } 182 } 183 } 184 185 @Override 186 public void reset(int nextExpectedReadSize) { 187 multiplier = AMQP_FRAME_SIZE_BYTES; 188 frameSize = 0; 189 } 190 }; 191 192 private final FrameParser contentReader = new FrameParser() { 193 194 private Buffer frame; 195 196 @Override 197 public void parse(ByteBuffer incoming) throws IOException { 198 int length = Math.min(incoming.remaining(), frame.getLength() - frame.offset); 199 incoming.get(frame.data, frame.offset, length); 200 frame.offset += length; 201 202 if (frame.offset == frame.length) { 203 LOG.trace("Contents of size {} have been read", frame.length); 204 frame.reset(); 205 frameSink.onFrame(frame); 206 if (currentParser == this) { 207 currentParser = initializeFrameLengthParser(); 208 } 209 if (incoming.hasRemaining()) { 210 currentParser.parse(incoming); 211 } 212 } 213 } 214 215 @Override 216 public void reset(int nextExpectedReadSize) { 217 // Allocate a new Buffer to hold the incoming frame. We must write 218 // back the frame size value before continue on to read the indicated 219 // frame size minus the size of the AMQP frame size header value. 220 frame = new Buffer(nextExpectedReadSize); 221 frame.bigEndianEditor().writeInt(nextExpectedReadSize); 222 223 // Reset the length to total length as we do direct write after this. 224 frame.length = frame.data.length; 225 } 226 }; 227}