001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.plugin;
018
019import org.apache.activemq.broker.Broker;
020import org.apache.activemq.broker.BrokerContext;
021import org.apache.activemq.broker.BrokerFilter;
022import org.apache.activemq.broker.ConnectionContext;
023import org.apache.activemq.broker.jmx.ManagementContext;
024import org.apache.activemq.broker.region.Destination;
025import org.apache.activemq.command.ActiveMQDestination;
026import org.apache.activemq.command.ConnectionInfo;
027import org.apache.activemq.plugin.jmx.RuntimeConfigurationView;
028import org.apache.activemq.schema.core.DtoBroker;
029import org.apache.activemq.spring.Utils;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032import org.springframework.core.io.Resource;
033import org.w3c.dom.Document;
034import org.w3c.dom.Node;
035import org.xml.sax.SAXException;
036
037import javax.management.JMException;
038import javax.management.ObjectName;
039import javax.xml.XMLConstants;
040import javax.xml.bind.JAXBContext;
041import javax.xml.bind.JAXBElement;
042import javax.xml.bind.JAXBException;
043import javax.xml.bind.Unmarshaller;
044import javax.xml.parsers.DocumentBuilder;
045import javax.xml.parsers.DocumentBuilderFactory;
046import javax.xml.parsers.ParserConfigurationException;
047import javax.xml.transform.Source;
048import javax.xml.transform.stream.StreamSource;
049import javax.xml.validation.Schema;
050import javax.xml.validation.SchemaFactory;
051import java.io.IOException;
052import java.util.ArrayList;
053import java.util.Date;
054import java.util.Properties;
055import java.util.concurrent.ConcurrentLinkedQueue;
056import java.util.concurrent.locks.ReentrantReadWriteLock;
057import java.util.regex.Pattern;
058
059public class RuntimeConfigurationBroker extends BrokerFilter {
060
061    public static final Logger LOG = LoggerFactory.getLogger(RuntimeConfigurationBroker.class);
062    public static final String objectNamePropsAppendage = ",service=RuntimeConfiguration,name=Plugin";
063    private final ReentrantReadWriteLock addDestinationBarrier = new ReentrantReadWriteLock();
064    private final ReentrantReadWriteLock addConnectionBarrier = new ReentrantReadWriteLock();
065    PropertiesPlaceHolderUtil placeHolderUtil = null;
066    private long checkPeriod;
067    private long lastModified = -1;
068    private Resource configToMonitor;
069    private DtoBroker currentConfiguration;
070    private Runnable monitorTask;
071    protected ConcurrentLinkedQueue<Runnable> addDestinationWork = new ConcurrentLinkedQueue<Runnable>();
072    protected ConcurrentLinkedQueue<Runnable> addConnectionWork = new ConcurrentLinkedQueue<Runnable>();
073    private ObjectName objectName;
074    private String infoString;
075    private Schema schema;
076
077    public RuntimeConfigurationBroker(Broker next) {
078        super(next);
079    }
080
081    @Override
082    public void start() throws Exception {
083        super.start();
084        try {
085            BrokerContext brokerContext = next.getBrokerService().getBrokerContext();
086            if (brokerContext != null) {
087                configToMonitor = Utils.resourceFromString(brokerContext.getConfigurationUrl());
088                info("Configuration " + configToMonitor);
089            } else {
090                LOG.error("Null BrokerContext; impossible to determine configuration url resource from broker, updates cannot be tracked");
091            }
092        } catch (Exception error) {
093            LOG.error("failed to determine configuration url resource from broker, updates cannot be tracked", error);
094        }
095
096        currentConfiguration = loadConfiguration(configToMonitor);
097        monitorModification(configToMonitor);
098        registerMbean();
099    }
100
101    @Override
102    public void stop() throws Exception {
103        if (monitorTask != null) {
104            try {
105                this.getBrokerService().getScheduler().cancel(monitorTask);
106            } catch (Exception letsNotStopStop) {
107                LOG.warn("Failed to cancel config monitor task", letsNotStopStop);
108            }
109        }
110        unregisterMbean();
111        super.stop();
112    }
113
114    private void registerMbean() {
115        if (getBrokerService().isUseJmx()) {
116            ManagementContext managementContext = getBrokerService().getManagementContext();
117            try {
118                objectName = new ObjectName(getBrokerService().getBrokerObjectName().toString() + objectNamePropsAppendage);
119                managementContext.registerMBean(new RuntimeConfigurationView(this), objectName);
120            } catch (Exception ignored) {
121                LOG.debug("failed to register RuntimeConfigurationMBean", ignored);
122            }
123        }
124    }
125
126    private void unregisterMbean() {
127        if (objectName != null) {
128            try {
129                getBrokerService().getManagementContext().unregisterMBean(objectName);
130            } catch (JMException ignored) {
131            }
132        }
133    }
134
135    // modification to virtual destinations interceptor needs exclusive access to destination add
136    @Override
137    public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemporary) throws Exception {
138        Runnable work = addDestinationWork.poll();
139        if (work != null) {
140            try {
141                addDestinationBarrier.writeLock().lockInterruptibly();
142                do {
143                    work.run();
144                    work = addDestinationWork.poll();
145                } while (work != null);
146                return super.addDestination(context, destination, createIfTemporary);
147            } finally {
148                addDestinationBarrier.writeLock().unlock();
149            }
150        } else {
151            try {
152                addDestinationBarrier.readLock().lockInterruptibly();
153                return super.addDestination(context, destination, createIfTemporary);
154            } finally {
155                addDestinationBarrier.readLock().unlock();
156            }
157        }
158    }
159
160    // modification to authentication plugin needs exclusive access to connection add
161    @Override
162    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
163        Runnable work = addConnectionWork.poll();
164        if (work != null) {
165            try {
166                addConnectionBarrier.writeLock().lockInterruptibly();
167                do {
168                    work.run();
169                    work = addConnectionWork.poll();
170                } while (work != null);
171                super.addConnection(context, info);
172            } finally {
173                addConnectionBarrier.writeLock().unlock();
174            }
175        } else {
176            try {
177                addConnectionBarrier.readLock().lockInterruptibly();
178                super.addConnection(context, info);
179            } finally {
180                addConnectionBarrier.readLock().unlock();
181            }
182        }
183    }
184
185    public String updateNow() {
186        LOG.info("Manual configuration update triggered");
187        infoString = "";
188        applyModifications(configToMonitor);
189        String result = infoString;
190        infoString = null;
191        return result;
192    }
193
194    private void monitorModification(final Resource configToMonitor) {
195        monitorTask = new Runnable() {
196            @Override
197            public void run() {
198                try {
199                    if (configToMonitor.lastModified() > lastModified) {
200                        applyModifications(configToMonitor);
201                    }
202                } catch (Throwable e) {
203                    LOG.error("Failed to determine lastModified time on configuration: " + configToMonitor, e);
204                }
205            }
206        };
207        if (lastModified > 0 && checkPeriod > 0) {
208            this.getBrokerService().getScheduler().executePeriodically(monitorTask, checkPeriod);
209            info("Monitoring for updates (every " + checkPeriod + "millis) : " + configToMonitor + ", lastUpdate: " + new Date(lastModified));
210        }
211    }
212
213    protected void debug(String s) {
214        LOG.debug(s);
215    }
216
217    protected void info(String s) {
218        LOG.info(filterPasswords(s));
219        if (infoString != null) {
220            infoString += s;
221            infoString += ";";
222        }
223    }
224
225    protected void info(String s, Throwable t) {
226        LOG.info(filterPasswords(s), t);
227        if (infoString != null) {
228            infoString += s;
229            infoString += ", " + t;
230            infoString += ";";
231        }
232    }
233
234    private void applyModifications(Resource configToMonitor) {
235        DtoBroker changed = loadConfiguration(configToMonitor);
236        if (changed != null && !currentConfiguration.equals(changed)) {
237            LOG.info("change in " + configToMonitor + " at: " + new Date(lastModified));
238            LOG.debug("current:" + filterPasswords(currentConfiguration));
239            LOG.debug("new    :" + filterPasswords(changed));
240            processSelectiveChanges(currentConfiguration, changed);
241            currentConfiguration = changed;
242        } else {
243            info("No material change to configuration in " + configToMonitor + " at: " + new Date(lastModified));
244        }
245    }
246
247    private void processSelectiveChanges(DtoBroker currentConfiguration, DtoBroker modifiedConfiguration) {
248
249        for (Class upDatable : new Class[]{
250                DtoBroker.DestinationPolicy.class,
251                DtoBroker.NetworkConnectors.class,
252                DtoBroker.DestinationInterceptors.class,
253                DtoBroker.Plugins.class,
254                DtoBroker.Destinations.class}) {
255            processChanges(currentConfiguration, modifiedConfiguration, upDatable);
256        }
257    }
258
259    private void processChanges(DtoBroker currentConfiguration, DtoBroker modifiedConfiguration, Class upDatable) {
260        ConfigurationProcessor processor = ProcessorFactory.createProcessor(this, upDatable);
261        processor.processChanges(currentConfiguration, modifiedConfiguration);
262    }
263
264    Pattern matchPassword = Pattern.compile("password=.*,");
265    private String filterPasswords(Object toEscape) {
266        return matchPassword.matcher(toEscape.toString()).replaceAll("password=???,");
267    }
268
269    private DtoBroker loadConfiguration(Resource configToMonitor) {
270        DtoBroker jaxbConfig = null;
271        if (configToMonitor != null) {
272            try {
273                JAXBContext context = JAXBContext.newInstance(DtoBroker.class);
274                Unmarshaller unMarshaller = context.createUnmarshaller();
275                unMarshaller.setSchema(getSchema());
276
277                // skip beans and pull out the broker node to validate
278                DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
279                dbf.setNamespaceAware(true);
280                DocumentBuilder db = dbf.newDocumentBuilder();
281                Document doc = db.parse(configToMonitor.getInputStream());
282                Node brokerRootNode = doc.getElementsByTagNameNS("*","broker").item(0);
283
284                if (brokerRootNode != null) {
285
286                    JAXBElement<DtoBroker> brokerJAXBElement =
287                            unMarshaller.unmarshal(brokerRootNode, DtoBroker.class);
288                    jaxbConfig = brokerJAXBElement.getValue();
289
290                    // if we can parse we can track mods
291                    lastModified = configToMonitor.lastModified();
292
293                    loadPropertiesPlaceHolderSupport(doc);
294
295                } else {
296                    info("Failed to find 'broker' element by tag in: " + configToMonitor);
297                }
298
299            } catch (IOException e) {
300                info("Failed to access: " + configToMonitor, e);
301            } catch (JAXBException e) {
302                info("Failed to parse: " + configToMonitor, e);
303            } catch (ParserConfigurationException e) {
304                info("Failed to document parse: " + configToMonitor, e);
305            } catch (SAXException e) {
306                info("Failed to find broker element in: " + configToMonitor, e);
307            } catch (Exception e) {
308                info("Unexpected exception during load of: " + configToMonitor, e);
309            }
310        }
311        return jaxbConfig;
312    }
313
314    private void loadPropertiesPlaceHolderSupport(Document doc) {
315        BrokerContext brokerContext = getBrokerService().getBrokerContext();
316        if (brokerContext != null) {
317            Properties initialProperties = new Properties(System.getProperties());
318            placeHolderUtil = new PropertiesPlaceHolderUtil(initialProperties);
319            placeHolderUtil.mergeProperties(doc, initialProperties, brokerContext);
320        }
321    }
322
323    private Schema getSchema() throws SAXException, IOException {
324        if (schema == null) {
325            SchemaFactory schemaFactory = SchemaFactory.newInstance(
326                    XMLConstants.W3C_XML_SCHEMA_NS_URI);
327
328            ArrayList<StreamSource> schemas = new ArrayList<StreamSource>();
329            schemas.add(new StreamSource(getClass().getResource("/activemq.xsd").toExternalForm()));
330            schemas.add(new StreamSource(getClass().getResource("/org/springframework/beans/factory/xml/spring-beans-3.0.xsd").toExternalForm()));
331            schema = schemaFactory.newSchema(schemas.toArray(new Source[]{}));
332        }
333        return schema;
334    }
335
336    public long getLastModified() {
337        return lastModified;
338    }
339
340    public Resource getConfigToMonitor() {
341        return configToMonitor;
342    }
343
344    public long getCheckPeriod() {
345        return checkPeriod;
346    }
347
348    public void setCheckPeriod(long checkPeriod) {
349        this.checkPeriod = checkPeriod;
350    }
351
352}