001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.amqp;
018
019import java.io.IOException;
020import java.nio.ByteBuffer;
021
022import org.apache.activemq.transport.amqp.AmqpWireFormat.ResetListener;
023import org.apache.activemq.transport.tcp.TcpTransport;
024import org.fusesource.hawtbuf.Buffer;
025import org.slf4j.Logger;
026import org.slf4j.LoggerFactory;
027
028/**
029 * State based Frame reader that is used in the NIO based transports where
030 * AMQP frames can come in in partial or overlapping forms.
031 */
032public class AmqpFrameParser {
033
034    private static final Logger LOG = LoggerFactory.getLogger(AmqpFrameParser.class);
035
036    public interface AMQPFrameSink {
037        void onFrame(Object frame);
038    }
039
040    private static final byte AMQP_FRAME_SIZE_BYTES = 4;
041    private static final byte AMQP_HEADER_BYTES = 8;
042
043    private final AMQPFrameSink frameSink;
044
045    private FrameParser currentParser;
046    private AmqpWireFormat wireFormat;
047
048    public AmqpFrameParser(AMQPFrameSink sink) {
049        this.frameSink = sink;
050    }
051
052    public AmqpFrameParser(final TcpTransport transport) {
053        this.frameSink = new AMQPFrameSink() {
054
055            @Override
056            public void onFrame(Object frame) {
057                transport.doConsume(frame);
058            }
059        };
060    }
061
062    public void parse(ByteBuffer incoming) throws Exception {
063
064        if (incoming == null || !incoming.hasRemaining()) {
065            return;
066        }
067
068        if (currentParser == null) {
069            currentParser = initializeHeaderParser();
070        }
071
072        // Parser stack will run until current incoming data has all been consumed.
073        currentParser.parse(incoming);
074    }
075
076    public void reset() {
077        currentParser = initializeHeaderParser();
078    }
079
080    private void validateFrameSize(int frameSize) throws IOException {
081        long maxFrameSize = AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE;
082        if (wireFormat != null) {
083            maxFrameSize = wireFormat.getMaxFrameSize();
084        }
085
086        if (frameSize > maxFrameSize) {
087            throw new IOException("Frame size of " + frameSize + " larger than max allowed " + maxFrameSize);
088        }
089    }
090
091    public void setWireFormat(AmqpWireFormat wireFormat) {
092        this.wireFormat = wireFormat;
093        if (wireFormat != null) {
094            wireFormat.setProtocolResetListener(new ResetListener() {
095
096                @Override
097                public void onProtocolReset() {
098                    reset();
099                }
100            });
101        }
102    }
103
104    public AmqpWireFormat getWireFormat() {
105        return this.wireFormat;
106    }
107
108    //----- Prepare the current frame parser for use -------------------------//
109
110    private FrameParser initializeHeaderParser() {
111        headerReader.reset(AMQP_HEADER_BYTES);
112        return headerReader;
113    }
114
115    private FrameParser initializeFrameLengthParser() {
116        frameSizeReader.reset(AMQP_FRAME_SIZE_BYTES);
117        return frameSizeReader;
118    }
119
120    private FrameParser initializeContentReader(int contentLength) {
121        contentReader.reset(contentLength);
122        return contentReader;
123    }
124
125    //----- Frame parser implementations -------------------------------------//
126
127    private interface FrameParser {
128
129        void parse(ByteBuffer incoming) throws IOException;
130
131        void reset(int nextExpectedReadSize);
132    }
133
134    private final FrameParser headerReader = new FrameParser() {
135
136        private final Buffer header = new Buffer(AMQP_HEADER_BYTES);
137
138        @Override
139        public void parse(ByteBuffer incoming) throws IOException {
140            int length = Math.min(incoming.remaining(), header.length - header.offset);
141
142            incoming.get(header.data, header.offset, length);
143            header.offset += length;
144
145            if (header.offset == AMQP_HEADER_BYTES) {
146                header.reset();
147                AmqpHeader amqpHeader = new AmqpHeader(header.deepCopy(), false);
148                currentParser = initializeFrameLengthParser();
149                frameSink.onFrame(amqpHeader);
150                if (incoming.hasRemaining()) {
151                    currentParser.parse(incoming);
152                }
153            }
154        }
155
156        @Override
157        public void reset(int nextExpectedReadSize) {
158            header.reset();
159        }
160    };
161
162    private final FrameParser frameSizeReader = new FrameParser() {
163
164        private int frameSize;
165        private int multiplier;
166
167        @Override
168        public void parse(ByteBuffer incoming) throws IOException {
169
170            while (incoming.hasRemaining()) {
171                frameSize += ((incoming.get() & 0xFF) << --multiplier * Byte.SIZE);
172
173                if (multiplier == 0) {
174                    LOG.trace("Next incoming frame length: {}", frameSize);
175                    validateFrameSize(frameSize);
176                    currentParser = initializeContentReader(frameSize);
177                    if (incoming.hasRemaining()) {
178                        currentParser.parse(incoming);
179                        return;
180                    }
181                }
182            }
183        }
184
185        @Override
186        public void reset(int nextExpectedReadSize) {
187            multiplier = AMQP_FRAME_SIZE_BYTES;
188            frameSize = 0;
189        }
190    };
191
192    private final FrameParser contentReader = new FrameParser() {
193
194        private Buffer frame;
195
196        @Override
197        public void parse(ByteBuffer incoming) throws IOException {
198            int length = Math.min(incoming.remaining(), frame.getLength() - frame.offset);
199            incoming.get(frame.data, frame.offset, length);
200            frame.offset += length;
201
202            if (frame.offset == frame.length) {
203                LOG.trace("Contents of size {} have been read", frame.length);
204                frame.reset();
205                frameSink.onFrame(frame);
206                if (currentParser == this) {
207                    currentParser = initializeFrameLengthParser();
208                }
209                if (incoming.hasRemaining()) {
210                    currentParser.parse(incoming);
211                }
212            }
213        }
214
215        @Override
216        public void reset(int nextExpectedReadSize) {
217            // Allocate a new Buffer to hold the incoming frame.  We must write
218            // back the frame size value before continue on to read the indicated
219            // frame size minus the size of the AMQP frame size header value.
220            frame = new Buffer(nextExpectedReadSize);
221            frame.bigEndianEditor().writeInt(nextExpectedReadSize);
222
223            // Reset the length to total length as we do direct write after this.
224            frame.length = frame.data.length;
225        }
226    };
227}