001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.plugin; 018 019import org.apache.activemq.broker.Broker; 020import org.apache.activemq.broker.BrokerContext; 021import org.apache.activemq.broker.BrokerFilter; 022import org.apache.activemq.broker.ConnectionContext; 023import org.apache.activemq.broker.jmx.ManagementContext; 024import org.apache.activemq.broker.region.Destination; 025import org.apache.activemq.command.ActiveMQDestination; 026import org.apache.activemq.command.ConnectionInfo; 027import org.apache.activemq.plugin.jmx.RuntimeConfigurationView; 028import org.apache.activemq.schema.core.DtoBroker; 029import org.apache.activemq.spring.Utils; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032import org.springframework.core.io.Resource; 033import org.w3c.dom.Document; 034import org.w3c.dom.Node; 035import org.xml.sax.SAXException; 036 037import javax.management.JMException; 038import javax.management.ObjectName; 039import javax.xml.XMLConstants; 040import javax.xml.bind.JAXBContext; 041import javax.xml.bind.JAXBElement; 042import javax.xml.bind.JAXBException; 043import javax.xml.bind.Unmarshaller; 044import javax.xml.parsers.DocumentBuilder; 045import javax.xml.parsers.DocumentBuilderFactory; 046import javax.xml.parsers.ParserConfigurationException; 047import javax.xml.transform.Source; 048import javax.xml.transform.stream.StreamSource; 049import javax.xml.validation.Schema; 050import javax.xml.validation.SchemaFactory; 051import java.io.IOException; 052import java.util.ArrayList; 053import java.util.Date; 054import java.util.Properties; 055import java.util.concurrent.ConcurrentLinkedQueue; 056import java.util.concurrent.locks.ReentrantReadWriteLock; 057import java.util.regex.Pattern; 058 059public class RuntimeConfigurationBroker extends BrokerFilter { 060 061 public static final Logger LOG = LoggerFactory.getLogger(RuntimeConfigurationBroker.class); 062 public static final String objectNamePropsAppendage = ",service=RuntimeConfiguration,name=Plugin"; 063 private final ReentrantReadWriteLock addDestinationBarrier = new ReentrantReadWriteLock(); 064 private final ReentrantReadWriteLock addConnectionBarrier = new ReentrantReadWriteLock(); 065 PropertiesPlaceHolderUtil placeHolderUtil = null; 066 private long checkPeriod; 067 private long lastModified = -1; 068 private Resource configToMonitor; 069 private DtoBroker currentConfiguration; 070 private Runnable monitorTask; 071 protected ConcurrentLinkedQueue<Runnable> addDestinationWork = new ConcurrentLinkedQueue<Runnable>(); 072 protected ConcurrentLinkedQueue<Runnable> addConnectionWork = new ConcurrentLinkedQueue<Runnable>(); 073 private ObjectName objectName; 074 private String infoString; 075 private Schema schema; 076 077 public RuntimeConfigurationBroker(Broker next) { 078 super(next); 079 } 080 081 @Override 082 public void start() throws Exception { 083 super.start(); 084 try { 085 BrokerContext brokerContext = next.getBrokerService().getBrokerContext(); 086 if (brokerContext != null) { 087 configToMonitor = Utils.resourceFromString(brokerContext.getConfigurationUrl()); 088 info("Configuration " + configToMonitor); 089 } else { 090 LOG.error("Null BrokerContext; impossible to determine configuration url resource from broker, updates cannot be tracked"); 091 } 092 } catch (Exception error) { 093 LOG.error("failed to determine configuration url resource from broker, updates cannot be tracked", error); 094 } 095 096 currentConfiguration = loadConfiguration(configToMonitor); 097 monitorModification(configToMonitor); 098 registerMbean(); 099 } 100 101 @Override 102 public void stop() throws Exception { 103 if (monitorTask != null) { 104 try { 105 this.getBrokerService().getScheduler().cancel(monitorTask); 106 } catch (Exception letsNotStopStop) { 107 LOG.warn("Failed to cancel config monitor task", letsNotStopStop); 108 } 109 } 110 unregisterMbean(); 111 super.stop(); 112 } 113 114 private void registerMbean() { 115 if (getBrokerService().isUseJmx()) { 116 ManagementContext managementContext = getBrokerService().getManagementContext(); 117 try { 118 objectName = new ObjectName(getBrokerService().getBrokerObjectName().toString() + objectNamePropsAppendage); 119 managementContext.registerMBean(new RuntimeConfigurationView(this), objectName); 120 } catch (Exception ignored) { 121 LOG.debug("failed to register RuntimeConfigurationMBean", ignored); 122 } 123 } 124 } 125 126 private void unregisterMbean() { 127 if (objectName != null) { 128 try { 129 getBrokerService().getManagementContext().unregisterMBean(objectName); 130 } catch (JMException ignored) { 131 } 132 } 133 } 134 135 // modification to virtual destinations interceptor needs exclusive access to destination add 136 @Override 137 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemporary) throws Exception { 138 Runnable work = addDestinationWork.poll(); 139 if (work != null) { 140 try { 141 addDestinationBarrier.writeLock().lockInterruptibly(); 142 do { 143 work.run(); 144 work = addDestinationWork.poll(); 145 } while (work != null); 146 return super.addDestination(context, destination, createIfTemporary); 147 } finally { 148 addDestinationBarrier.writeLock().unlock(); 149 } 150 } else { 151 try { 152 addDestinationBarrier.readLock().lockInterruptibly(); 153 return super.addDestination(context, destination, createIfTemporary); 154 } finally { 155 addDestinationBarrier.readLock().unlock(); 156 } 157 } 158 } 159 160 // modification to authentication plugin needs exclusive access to connection add 161 @Override 162 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { 163 Runnable work = addConnectionWork.poll(); 164 if (work != null) { 165 try { 166 addConnectionBarrier.writeLock().lockInterruptibly(); 167 do { 168 work.run(); 169 work = addConnectionWork.poll(); 170 } while (work != null); 171 super.addConnection(context, info); 172 } finally { 173 addConnectionBarrier.writeLock().unlock(); 174 } 175 } else { 176 try { 177 addConnectionBarrier.readLock().lockInterruptibly(); 178 super.addConnection(context, info); 179 } finally { 180 addConnectionBarrier.readLock().unlock(); 181 } 182 } 183 } 184 185 public String updateNow() { 186 LOG.info("Manual configuration update triggered"); 187 infoString = ""; 188 applyModifications(configToMonitor); 189 String result = infoString; 190 infoString = null; 191 return result; 192 } 193 194 private void monitorModification(final Resource configToMonitor) { 195 monitorTask = new Runnable() { 196 @Override 197 public void run() { 198 try { 199 if (configToMonitor.lastModified() > lastModified) { 200 applyModifications(configToMonitor); 201 } 202 } catch (Throwable e) { 203 LOG.error("Failed to determine lastModified time on configuration: " + configToMonitor, e); 204 } 205 } 206 }; 207 if (lastModified > 0 && checkPeriod > 0) { 208 this.getBrokerService().getScheduler().executePeriodically(monitorTask, checkPeriod); 209 info("Monitoring for updates (every " + checkPeriod + "millis) : " + configToMonitor + ", lastUpdate: " + new Date(lastModified)); 210 } 211 } 212 213 protected void debug(String s) { 214 LOG.debug(s); 215 } 216 217 protected void info(String s) { 218 LOG.info(filterPasswords(s)); 219 if (infoString != null) { 220 infoString += s; 221 infoString += ";"; 222 } 223 } 224 225 protected void info(String s, Throwable t) { 226 LOG.info(filterPasswords(s), t); 227 if (infoString != null) { 228 infoString += s; 229 infoString += ", " + t; 230 infoString += ";"; 231 } 232 } 233 234 private void applyModifications(Resource configToMonitor) { 235 DtoBroker changed = loadConfiguration(configToMonitor); 236 if (changed != null && !currentConfiguration.equals(changed)) { 237 LOG.info("change in " + configToMonitor + " at: " + new Date(lastModified)); 238 LOG.debug("current:" + filterPasswords(currentConfiguration)); 239 LOG.debug("new :" + filterPasswords(changed)); 240 processSelectiveChanges(currentConfiguration, changed); 241 currentConfiguration = changed; 242 } else { 243 info("No material change to configuration in " + configToMonitor + " at: " + new Date(lastModified)); 244 } 245 } 246 247 private void processSelectiveChanges(DtoBroker currentConfiguration, DtoBroker modifiedConfiguration) { 248 249 for (Class upDatable : new Class[]{ 250 DtoBroker.DestinationPolicy.class, 251 DtoBroker.NetworkConnectors.class, 252 DtoBroker.DestinationInterceptors.class, 253 DtoBroker.Plugins.class, 254 DtoBroker.Destinations.class}) { 255 processChanges(currentConfiguration, modifiedConfiguration, upDatable); 256 } 257 } 258 259 private void processChanges(DtoBroker currentConfiguration, DtoBroker modifiedConfiguration, Class upDatable) { 260 ConfigurationProcessor processor = ProcessorFactory.createProcessor(this, upDatable); 261 processor.processChanges(currentConfiguration, modifiedConfiguration); 262 } 263 264 Pattern matchPassword = Pattern.compile("password=.*,"); 265 private String filterPasswords(Object toEscape) { 266 return matchPassword.matcher(toEscape.toString()).replaceAll("password=???,"); 267 } 268 269 private DtoBroker loadConfiguration(Resource configToMonitor) { 270 DtoBroker jaxbConfig = null; 271 if (configToMonitor != null) { 272 try { 273 JAXBContext context = JAXBContext.newInstance(DtoBroker.class); 274 Unmarshaller unMarshaller = context.createUnmarshaller(); 275 unMarshaller.setSchema(getSchema()); 276 277 // skip beans and pull out the broker node to validate 278 DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); 279 dbf.setNamespaceAware(true); 280 DocumentBuilder db = dbf.newDocumentBuilder(); 281 Document doc = db.parse(configToMonitor.getInputStream()); 282 Node brokerRootNode = doc.getElementsByTagNameNS("*","broker").item(0); 283 284 if (brokerRootNode != null) { 285 286 JAXBElement<DtoBroker> brokerJAXBElement = 287 unMarshaller.unmarshal(brokerRootNode, DtoBroker.class); 288 jaxbConfig = brokerJAXBElement.getValue(); 289 290 // if we can parse we can track mods 291 lastModified = configToMonitor.lastModified(); 292 293 loadPropertiesPlaceHolderSupport(doc); 294 295 } else { 296 info("Failed to find 'broker' element by tag in: " + configToMonitor); 297 } 298 299 } catch (IOException e) { 300 info("Failed to access: " + configToMonitor, e); 301 } catch (JAXBException e) { 302 info("Failed to parse: " + configToMonitor, e); 303 } catch (ParserConfigurationException e) { 304 info("Failed to document parse: " + configToMonitor, e); 305 } catch (SAXException e) { 306 info("Failed to find broker element in: " + configToMonitor, e); 307 } catch (Exception e) { 308 info("Unexpected exception during load of: " + configToMonitor, e); 309 } 310 } 311 return jaxbConfig; 312 } 313 314 private void loadPropertiesPlaceHolderSupport(Document doc) { 315 BrokerContext brokerContext = getBrokerService().getBrokerContext(); 316 if (brokerContext != null) { 317 Properties initialProperties = new Properties(System.getProperties()); 318 placeHolderUtil = new PropertiesPlaceHolderUtil(initialProperties); 319 placeHolderUtil.mergeProperties(doc, initialProperties, brokerContext); 320 } 321 } 322 323 private Schema getSchema() throws SAXException, IOException { 324 if (schema == null) { 325 SchemaFactory schemaFactory = SchemaFactory.newInstance( 326 XMLConstants.W3C_XML_SCHEMA_NS_URI); 327 328 ArrayList<StreamSource> schemas = new ArrayList<StreamSource>(); 329 schemas.add(new StreamSource(getClass().getResource("/activemq.xsd").toExternalForm())); 330 schemas.add(new StreamSource(getClass().getResource("/org/springframework/beans/factory/xml/spring-beans-3.0.xsd").toExternalForm())); 331 schema = schemaFactory.newSchema(schemas.toArray(new Source[]{})); 332 } 333 return schema; 334 } 335 336 public long getLastModified() { 337 return lastModified; 338 } 339 340 public Resource getConfigToMonitor() { 341 return configToMonitor; 342 } 343 344 public long getCheckPeriod() { 345 return checkPeriod; 346 } 347 348 public void setCheckPeriod(long checkPeriod) { 349 this.checkPeriod = checkPeriod; 350 } 351 352}