001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network; 018 019import java.io.IOException; 020 021import org.apache.activemq.command.ActiveMQDestination; 022import org.apache.activemq.command.ConsumerId; 023import org.apache.activemq.command.ConsumerInfo; 024import org.apache.activemq.filter.DestinationFilter; 025import org.apache.activemq.transport.Transport; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028 029/** 030 * Consolidates subscriptions 031 */ 032public class DurableConduitBridge extends ConduitBridge { 033 private static final Logger LOG = LoggerFactory.getLogger(DurableConduitBridge.class); 034 035 public String toString() { 036 return "DurableConduitBridge:" + configuration.getBrokerName() + "->" + getRemoteBrokerName(); 037 } 038 /** 039 * Constructor 040 * 041 * @param configuration 042 * 043 * @param localBroker 044 * @param remoteBroker 045 */ 046 public DurableConduitBridge(NetworkBridgeConfiguration configuration, Transport localBroker, 047 Transport remoteBroker) { 048 super(configuration, localBroker, remoteBroker); 049 } 050 051 /** 052 * Subscriptions for these destinations are always created 053 * 054 */ 055 protected void setupStaticDestinations() { 056 super.setupStaticDestinations(); 057 ActiveMQDestination[] dests = configuration.isDynamicOnly() ? null : durableDestinations; 058 if (dests != null) { 059 for (ActiveMQDestination dest : dests) { 060 if (isPermissableDestination(dest) && !doesConsumerExist(dest)) { 061 DemandSubscription sub = createDemandSubscription(dest); 062 sub.setStaticallyIncluded(true); 063 if (dest.isTopic()) { 064 sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest)); 065 } 066 try { 067 addSubscription(sub); 068 } catch (IOException e) { 069 LOG.error("Failed to add static destination {}", dest, e); 070 } 071 LOG.trace("Forwarding messages for durable destination: {}", dest); 072 } 073 } 074 } 075 } 076 077 protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { 078 if (addToAlreadyInterestedConsumers(info)) { 079 return null; // don't want this subscription added 080 } 081 //add our original id to ourselves 082 info.addNetworkConsumerId(info.getConsumerId()); 083 084 if (info.isDurable()) { 085 // set the subscriber name to something reproducible 086 info.setSubscriptionName(getSubscriberName(info.getDestination())); 087 // and override the consumerId with something unique so that it won't 088 // be removed if the durable subscriber (at the other end) goes away 089 info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), 090 consumerIdGenerator.getNextSequenceId())); 091 } 092 info.setSelector(null); 093 return doCreateDemandSubscription(info); 094 } 095 096 protected String getSubscriberName(ActiveMQDestination dest) { 097 String subscriberName = DURABLE_SUB_PREFIX + configuration.getBrokerName() + "_" + dest.getPhysicalName(); 098 return subscriberName; 099 } 100 101 protected boolean doesConsumerExist(ActiveMQDestination dest) { 102 DestinationFilter filter = DestinationFilter.parseFilter(dest); 103 for (DemandSubscription ds : subscriptionMapByLocalId.values()) { 104 if (filter.matches(ds.getLocalInfo().getDestination())) { 105 return true; 106 } 107 } 108 return false; 109 } 110}