001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.util;
018
019import java.io.IOException;
020import java.sql.SQLException;
021import java.util.Map;
022import java.util.concurrent.TimeUnit;
023import java.util.concurrent.atomic.AtomicBoolean;
024
025import org.apache.activemq.broker.BrokerService;
026import org.apache.activemq.broker.SuppressReplyException;
027import org.apache.activemq.broker.region.Destination;
028import org.apache.activemq.broker.region.Queue;
029import org.apache.activemq.broker.region.RegionBroker;
030import org.apache.activemq.command.ActiveMQDestination;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * @org.apache.xbean.XBean
036 */
037 public class DefaultIOExceptionHandler implements IOExceptionHandler {
038
039    private static final Logger LOG = LoggerFactory
040            .getLogger(DefaultIOExceptionHandler.class);
041    protected BrokerService broker;
042    private boolean ignoreAllErrors = false;
043    private boolean ignoreNoSpaceErrors = true;
044    private boolean ignoreSQLExceptions = true;
045    private boolean stopStartConnectors = false;
046    private String noSpaceMessage = "space";
047    private String sqlExceptionMessage = ""; // match all
048    private long resumeCheckSleepPeriod = 5*1000;
049    private final AtomicBoolean handlingException = new AtomicBoolean(false);
050
051    @Override
052    public void handle(IOException exception) {
053        if (ignoreAllErrors) {
054            LOG.info("Ignoring IO exception, " + exception, exception);
055            return;
056        }
057
058        if (ignoreNoSpaceErrors) {
059            Throwable cause = exception;
060            while (cause != null && cause instanceof IOException) {
061                String message = cause.getMessage();
062                if (message != null && message.contains(noSpaceMessage)) {
063                    LOG.info("Ignoring no space left exception, " + exception, exception);
064                    return;
065                }
066                cause = cause.getCause();
067            }
068        }
069
070        if (ignoreSQLExceptions) {
071            Throwable cause = exception;
072            while (cause != null) {
073                if (cause instanceof SQLException) {
074                    String message = cause.getMessage();
075
076                    if (message == null) {
077                        message = "";
078                    }
079
080                    if (message.contains(sqlExceptionMessage)) {
081                        LOG.info("Ignoring SQLException, " + exception, cause);
082                        return;
083                    }
084                }
085                cause = cause.getCause();
086            }
087        }
088
089        if (stopStartConnectors) {
090            if (handlingException.compareAndSet(false, true)) {
091                LOG.info("Initiating stop/restart of transports on " + broker + " due to IO exception, " + exception, exception);
092
093                new Thread("IOExceptionHandler: stop transports") {
094                    @Override
095                    public void run() {
096                        try {
097                            ServiceStopper stopper = new ServiceStopper();
098                            broker.stopAllConnectors(stopper);
099                            LOG.info("Successfully stopped transports on " + broker);
100                        } catch (Exception e) {
101                            LOG.warn("Failure occurred while stopping broker connectors", e);
102                        } finally {
103                            // resume again
104                            new Thread("IOExceptionHandler: restart transports") {
105                                @Override
106                                public void run() {
107                                    try {
108                                        while (hasLockOwnership() && isPersistenceAdapterDown()) {
109                                            LOG.info("waiting for broker persistence adapter checkpoint to succeed before restarting transports");
110                                            TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod);
111                                        }
112                                        if (hasLockOwnership()) {
113                                            Map<ActiveMQDestination, Destination> destinations = ((RegionBroker)broker.getRegionBroker()).getDestinationMap();
114                                            for (Destination destination : destinations.values()) {
115
116                                                if (destination instanceof Queue) {
117                                                    Queue queue = (Queue)destination;
118                                                    if (queue.isResetNeeded()) {
119                                                        queue.clearPendingMessages();
120                                                    }
121                                                }
122                                            }
123                                            broker.startAllConnectors();
124                                            LOG.info("Successfully restarted transports on " + broker);
125                                        }
126                                    } catch (Exception e) {
127                                        LOG.warn("Stopping " + broker + " due to failure restarting transports", e);
128                                        stopBroker(e);
129                                    } finally {
130                                        handlingException.compareAndSet(true, false);
131                                    }
132                                }
133
134                                private boolean isPersistenceAdapterDown() {
135                                    boolean checkpointSuccess = false;
136                                    try {
137                                        broker.getPersistenceAdapter().checkpoint(true);
138                                        checkpointSuccess = true;
139                                    } catch (Throwable ignored) {
140                                    }
141                                    return !checkpointSuccess;
142                                }
143                            }.start();
144
145
146                        }
147                    }
148                }.start();
149            }
150
151            throw new SuppressReplyException("Stop/RestartTransportsInitiated", exception);
152        }
153
154        if (handlingException.compareAndSet(false, true)) {
155            stopBroker(exception);
156        }
157
158        // we don't want to propagate the exception back to the client
159        // They will see a delay till they see a disconnect via socket.close
160        // at which point failover: can kick in.
161        throw new SuppressReplyException("ShutdownBrokerInitiated", exception);
162    }
163
164    private void stopBroker(Exception exception) {
165        LOG.info("Stopping " + broker + " due to exception, " + exception, exception);
166        new Thread("IOExceptionHandler: stopping " + broker) {
167            @Override
168            public void run() {
169                try {
170                    if( broker.isRestartAllowed() ) {
171                        broker.requestRestart();
172                    }
173                    broker.stop();
174                } catch (Exception e) {
175                    LOG.warn("Failure occurred while stopping broker", e);
176                }
177            }
178        }.start();
179    }
180
181    protected boolean hasLockOwnership() throws IOException {
182        return true;
183    }
184
185    @Override
186    public void setBrokerService(BrokerService broker) {
187        this.broker = broker;
188    }
189
190    public boolean isIgnoreAllErrors() {
191        return ignoreAllErrors;
192    }
193
194    public void setIgnoreAllErrors(boolean ignoreAllErrors) {
195        this.ignoreAllErrors = ignoreAllErrors;
196    }
197
198    public boolean isIgnoreNoSpaceErrors() {
199        return ignoreNoSpaceErrors;
200    }
201
202    public void setIgnoreNoSpaceErrors(boolean ignoreNoSpaceErrors) {
203        this.ignoreNoSpaceErrors = ignoreNoSpaceErrors;
204    }
205
206    public String getNoSpaceMessage() {
207        return noSpaceMessage;
208    }
209
210    public void setNoSpaceMessage(String noSpaceMessage) {
211        this.noSpaceMessage = noSpaceMessage;
212    }
213
214    public boolean isIgnoreSQLExceptions() {
215        return ignoreSQLExceptions;
216    }
217
218    public void setIgnoreSQLExceptions(boolean ignoreSQLExceptions) {
219        this.ignoreSQLExceptions = ignoreSQLExceptions;
220    }
221
222    public String getSqlExceptionMessage() {
223        return sqlExceptionMessage;
224    }
225
226    public void setSqlExceptionMessage(String sqlExceptionMessage) {
227        this.sqlExceptionMessage = sqlExceptionMessage;
228    }
229
230    public boolean isStopStartConnectors() {
231        return stopStartConnectors;
232    }
233
234    public void setStopStartConnectors(boolean stopStartConnectors) {
235        this.stopStartConnectors = stopStartConnectors;
236    }
237
238    public long getResumeCheckSleepPeriod() {
239        return resumeCheckSleepPeriod;
240    }
241
242    public void setResumeCheckSleepPeriod(long resumeCheckSleepPeriod) {
243        this.resumeCheckSleepPeriod = resumeCheckSleepPeriod;
244    }
245}