001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.util; 018 019import java.io.IOException; 020import java.sql.SQLException; 021import java.util.Map; 022import java.util.concurrent.TimeUnit; 023import java.util.concurrent.atomic.AtomicBoolean; 024 025import org.apache.activemq.broker.BrokerService; 026import org.apache.activemq.broker.SuppressReplyException; 027import org.apache.activemq.broker.region.Destination; 028import org.apache.activemq.broker.region.Queue; 029import org.apache.activemq.broker.region.RegionBroker; 030import org.apache.activemq.command.ActiveMQDestination; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034/** 035 * @org.apache.xbean.XBean 036 */ 037 public class DefaultIOExceptionHandler implements IOExceptionHandler { 038 039 private static final Logger LOG = LoggerFactory 040 .getLogger(DefaultIOExceptionHandler.class); 041 protected BrokerService broker; 042 private boolean ignoreAllErrors = false; 043 private boolean ignoreNoSpaceErrors = true; 044 private boolean ignoreSQLExceptions = true; 045 private boolean stopStartConnectors = false; 046 private String noSpaceMessage = "space"; 047 private String sqlExceptionMessage = ""; // match all 048 private long resumeCheckSleepPeriod = 5*1000; 049 private final AtomicBoolean handlingException = new AtomicBoolean(false); 050 051 @Override 052 public void handle(IOException exception) { 053 if (ignoreAllErrors) { 054 LOG.info("Ignoring IO exception, " + exception, exception); 055 return; 056 } 057 058 if (ignoreNoSpaceErrors) { 059 Throwable cause = exception; 060 while (cause != null && cause instanceof IOException) { 061 String message = cause.getMessage(); 062 if (message != null && message.contains(noSpaceMessage)) { 063 LOG.info("Ignoring no space left exception, " + exception, exception); 064 return; 065 } 066 cause = cause.getCause(); 067 } 068 } 069 070 if (ignoreSQLExceptions) { 071 Throwable cause = exception; 072 while (cause != null) { 073 if (cause instanceof SQLException) { 074 String message = cause.getMessage(); 075 076 if (message == null) { 077 message = ""; 078 } 079 080 if (message.contains(sqlExceptionMessage)) { 081 LOG.info("Ignoring SQLException, " + exception, cause); 082 return; 083 } 084 } 085 cause = cause.getCause(); 086 } 087 } 088 089 if (stopStartConnectors) { 090 if (handlingException.compareAndSet(false, true)) { 091 LOG.info("Initiating stop/restart of transports on " + broker + " due to IO exception, " + exception, exception); 092 093 new Thread("IOExceptionHandler: stop transports") { 094 @Override 095 public void run() { 096 try { 097 ServiceStopper stopper = new ServiceStopper(); 098 broker.stopAllConnectors(stopper); 099 LOG.info("Successfully stopped transports on " + broker); 100 } catch (Exception e) { 101 LOG.warn("Failure occurred while stopping broker connectors", e); 102 } finally { 103 // resume again 104 new Thread("IOExceptionHandler: restart transports") { 105 @Override 106 public void run() { 107 try { 108 while (hasLockOwnership() && isPersistenceAdapterDown()) { 109 LOG.info("waiting for broker persistence adapter checkpoint to succeed before restarting transports"); 110 TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod); 111 } 112 if (hasLockOwnership()) { 113 Map<ActiveMQDestination, Destination> destinations = ((RegionBroker)broker.getRegionBroker()).getDestinationMap(); 114 for (Destination destination : destinations.values()) { 115 116 if (destination instanceof Queue) { 117 Queue queue = (Queue)destination; 118 if (queue.isResetNeeded()) { 119 queue.clearPendingMessages(); 120 } 121 } 122 } 123 broker.startAllConnectors(); 124 LOG.info("Successfully restarted transports on " + broker); 125 } 126 } catch (Exception e) { 127 LOG.warn("Stopping " + broker + " due to failure restarting transports", e); 128 stopBroker(e); 129 } finally { 130 handlingException.compareAndSet(true, false); 131 } 132 } 133 134 private boolean isPersistenceAdapterDown() { 135 boolean checkpointSuccess = false; 136 try { 137 broker.getPersistenceAdapter().checkpoint(true); 138 checkpointSuccess = true; 139 } catch (Throwable ignored) { 140 } 141 return !checkpointSuccess; 142 } 143 }.start(); 144 145 146 } 147 } 148 }.start(); 149 } 150 151 throw new SuppressReplyException("Stop/RestartTransportsInitiated", exception); 152 } 153 154 if (handlingException.compareAndSet(false, true)) { 155 stopBroker(exception); 156 } 157 158 // we don't want to propagate the exception back to the client 159 // They will see a delay till they see a disconnect via socket.close 160 // at which point failover: can kick in. 161 throw new SuppressReplyException("ShutdownBrokerInitiated", exception); 162 } 163 164 private void stopBroker(Exception exception) { 165 LOG.info("Stopping " + broker + " due to exception, " + exception, exception); 166 new Thread("IOExceptionHandler: stopping " + broker) { 167 @Override 168 public void run() { 169 try { 170 if( broker.isRestartAllowed() ) { 171 broker.requestRestart(); 172 } 173 broker.stop(); 174 } catch (Exception e) { 175 LOG.warn("Failure occurred while stopping broker", e); 176 } 177 } 178 }.start(); 179 } 180 181 protected boolean hasLockOwnership() throws IOException { 182 return true; 183 } 184 185 @Override 186 public void setBrokerService(BrokerService broker) { 187 this.broker = broker; 188 } 189 190 public boolean isIgnoreAllErrors() { 191 return ignoreAllErrors; 192 } 193 194 public void setIgnoreAllErrors(boolean ignoreAllErrors) { 195 this.ignoreAllErrors = ignoreAllErrors; 196 } 197 198 public boolean isIgnoreNoSpaceErrors() { 199 return ignoreNoSpaceErrors; 200 } 201 202 public void setIgnoreNoSpaceErrors(boolean ignoreNoSpaceErrors) { 203 this.ignoreNoSpaceErrors = ignoreNoSpaceErrors; 204 } 205 206 public String getNoSpaceMessage() { 207 return noSpaceMessage; 208 } 209 210 public void setNoSpaceMessage(String noSpaceMessage) { 211 this.noSpaceMessage = noSpaceMessage; 212 } 213 214 public boolean isIgnoreSQLExceptions() { 215 return ignoreSQLExceptions; 216 } 217 218 public void setIgnoreSQLExceptions(boolean ignoreSQLExceptions) { 219 this.ignoreSQLExceptions = ignoreSQLExceptions; 220 } 221 222 public String getSqlExceptionMessage() { 223 return sqlExceptionMessage; 224 } 225 226 public void setSqlExceptionMessage(String sqlExceptionMessage) { 227 this.sqlExceptionMessage = sqlExceptionMessage; 228 } 229 230 public boolean isStopStartConnectors() { 231 return stopStartConnectors; 232 } 233 234 public void setStopStartConnectors(boolean stopStartConnectors) { 235 this.stopStartConnectors = stopStartConnectors; 236 } 237 238 public long getResumeCheckSleepPeriod() { 239 return resumeCheckSleepPeriod; 240 } 241 242 public void setResumeCheckSleepPeriod(long resumeCheckSleepPeriod) { 243 this.resumeCheckSleepPeriod = resumeCheckSleepPeriod; 244 } 245}