001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb.plist; 018 019import org.apache.activemq.broker.BrokerService; 020import org.apache.activemq.broker.BrokerServiceAware; 021import org.apache.activemq.openwire.OpenWireFormat; 022import org.apache.activemq.store.JournaledStore; 023import org.apache.activemq.store.PList; 024import org.apache.activemq.store.PListStore; 025import org.apache.activemq.store.kahadb.disk.index.BTreeIndex; 026import org.apache.activemq.store.kahadb.disk.journal.Journal; 027import org.apache.activemq.store.kahadb.disk.journal.Location; 028import org.apache.activemq.store.kahadb.disk.page.Page; 029import org.apache.activemq.store.kahadb.disk.page.PageFile; 030import org.apache.activemq.store.kahadb.disk.page.Transaction; 031import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; 032import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; 033import org.apache.activemq.thread.Scheduler; 034import org.apache.activemq.util.*; 035import org.apache.activemq.wireformat.WireFormat; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import java.io.DataInput; 040import java.io.DataOutput; 041import java.io.File; 042import java.io.IOException; 043import java.util.*; 044import java.util.Map.Entry; 045 046/** 047 * @org.apache.xbean.XBean 048 */ 049public class PListStoreImpl extends ServiceSupport implements BrokerServiceAware, Runnable, PListStore, JournaledStore { 050 static final Logger LOG = LoggerFactory.getLogger(PListStoreImpl.class); 051 private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000; 052 053 static final int CLOSED_STATE = 1; 054 static final int OPEN_STATE = 2; 055 056 private File directory; 057 PageFile pageFile; 058 private Journal journal; 059 private LockFile lockFile; 060 private boolean failIfDatabaseIsLocked; 061 private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; 062 private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; 063 private boolean enableIndexWriteAsync = false; 064 private boolean initialized = false; 065 private boolean lazyInit = true; 066 // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 067 MetaData metaData = new MetaData(this); 068 final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this); 069 Map<String, PListImpl> persistentLists = new HashMap<String, PListImpl>(); 070 final Object indexLock = new Object(); 071 private Scheduler scheduler; 072 private long cleanupInterval = 30000; 073 074 private int indexPageSize = PageFile.DEFAULT_PAGE_SIZE; 075 private int indexCacheSize = PageFile.DEFAULT_PAGE_CACHE_SIZE; 076 private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 077 private boolean indexEnablePageCaching = true; 078 079 public Object getIndexLock() { 080 return indexLock; 081 } 082 083 @Override 084 public void setBrokerService(BrokerService brokerService) { 085 this.scheduler = brokerService.getScheduler(); 086 } 087 088 public int getIndexPageSize() { 089 return indexPageSize; 090 } 091 092 public int getIndexCacheSize() { 093 return indexCacheSize; 094 } 095 096 public int getIndexWriteBatchSize() { 097 return indexWriteBatchSize; 098 } 099 100 public void setIndexPageSize(int indexPageSize) { 101 this.indexPageSize = indexPageSize; 102 } 103 104 public void setIndexCacheSize(int indexCacheSize) { 105 this.indexCacheSize = indexCacheSize; 106 } 107 108 public void setIndexWriteBatchSize(int indexWriteBatchSize) { 109 this.indexWriteBatchSize = indexWriteBatchSize; 110 } 111 112 public boolean getIndexEnablePageCaching() { 113 return indexEnablePageCaching; 114 } 115 116 public void setIndexEnablePageCaching(boolean indexEnablePageCaching) { 117 this.indexEnablePageCaching = indexEnablePageCaching; 118 } 119 120 protected class MetaData { 121 protected MetaData(PListStoreImpl store) { 122 this.store = store; 123 } 124 125 private final PListStoreImpl store; 126 Page<MetaData> page; 127 BTreeIndex<String, PListImpl> lists; 128 129 void createIndexes(Transaction tx) throws IOException { 130 this.lists = new BTreeIndex<String, PListImpl>(pageFile, tx.allocate().getPageId()); 131 } 132 133 void load(Transaction tx) throws IOException { 134 this.lists.setKeyMarshaller(StringMarshaller.INSTANCE); 135 this.lists.setValueMarshaller(new PListMarshaller(this.store)); 136 this.lists.load(tx); 137 } 138 139 void loadLists(Transaction tx, Map<String, PListImpl> lists) throws IOException { 140 for (Iterator<Entry<String, PListImpl>> i = this.lists.iterator(tx); i.hasNext();) { 141 Entry<String, PListImpl> entry = i.next(); 142 entry.getValue().load(tx); 143 lists.put(entry.getKey(), entry.getValue()); 144 } 145 } 146 147 public void read(DataInput is) throws IOException { 148 this.lists = new BTreeIndex<String, PListImpl>(pageFile, is.readLong()); 149 this.lists.setKeyMarshaller(StringMarshaller.INSTANCE); 150 this.lists.setValueMarshaller(new PListMarshaller(this.store)); 151 } 152 153 public void write(DataOutput os) throws IOException { 154 os.writeLong(this.lists.getPageId()); 155 } 156 } 157 158 class MetaDataMarshaller extends VariableMarshaller<MetaData> { 159 private final PListStoreImpl store; 160 161 MetaDataMarshaller(PListStoreImpl store) { 162 this.store = store; 163 } 164 public MetaData readPayload(DataInput dataIn) throws IOException { 165 MetaData rc = new MetaData(this.store); 166 rc.read(dataIn); 167 return rc; 168 } 169 170 public void writePayload(MetaData object, DataOutput dataOut) throws IOException { 171 object.write(dataOut); 172 } 173 } 174 175 class PListMarshaller extends VariableMarshaller<PListImpl> { 176 private final PListStoreImpl store; 177 PListMarshaller(PListStoreImpl store) { 178 this.store = store; 179 } 180 public PListImpl readPayload(DataInput dataIn) throws IOException { 181 PListImpl result = new PListImpl(this.store); 182 result.read(dataIn); 183 return result; 184 } 185 186 public void writePayload(PListImpl list, DataOutput dataOut) throws IOException { 187 list.write(dataOut); 188 } 189 } 190 191 public Journal getJournal() { 192 return this.journal; 193 } 194 195 @Override 196 public File getDirectory() { 197 return directory; 198 } 199 200 @Override 201 public void setDirectory(File directory) { 202 this.directory = directory; 203 } 204 205 public long size() { 206 synchronized (this) { 207 if (!initialized) { 208 return 0; 209 } 210 } 211 try { 212 return journal.getDiskSize() + pageFile.getDiskSize(); 213 } catch (IOException e) { 214 throw new RuntimeException(e); 215 } 216 } 217 218 @Override 219 public PListImpl getPList(final String name) throws Exception { 220 if (!isStarted()) { 221 throw new IllegalStateException("Not started"); 222 } 223 intialize(); 224 synchronized (indexLock) { 225 synchronized (this) { 226 PListImpl result = this.persistentLists.get(name); 227 if (result == null) { 228 final PListImpl pl = new PListImpl(this); 229 pl.setName(name); 230 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 231 public void execute(Transaction tx) throws IOException { 232 pl.setHeadPageId(tx.allocate().getPageId()); 233 pl.load(tx); 234 metaData.lists.put(tx, name, pl); 235 } 236 }); 237 result = pl; 238 this.persistentLists.put(name, pl); 239 } 240 final PListImpl toLoad = result; 241 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 242 public void execute(Transaction tx) throws IOException { 243 toLoad.load(tx); 244 } 245 }); 246 247 return result; 248 } 249 } 250 } 251 252 @Override 253 public boolean removePList(final String name) throws Exception { 254 boolean result = false; 255 synchronized (indexLock) { 256 synchronized (this) { 257 final PList pl = this.persistentLists.remove(name); 258 result = pl != null; 259 if (result) { 260 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 261 public void execute(Transaction tx) throws IOException { 262 metaData.lists.remove(tx, name); 263 pl.destroy(); 264 } 265 }); 266 } 267 } 268 } 269 return result; 270 } 271 272 protected synchronized void intialize() throws Exception { 273 if (isStarted()) { 274 if (this.initialized == false) { 275 if (this.directory == null) { 276 this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB"); 277 } 278 IOHelper.mkdirs(this.directory); 279 lock(); 280 this.journal = new Journal(); 281 this.journal.setDirectory(directory); 282 this.journal.setMaxFileLength(getJournalMaxFileLength()); 283 this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize()); 284 this.journal.start(); 285 this.pageFile = new PageFile(directory, "tmpDB"); 286 this.pageFile.setEnablePageCaching(getIndexEnablePageCaching()); 287 this.pageFile.setPageSize(getIndexPageSize()); 288 this.pageFile.setWriteBatchSize(getIndexWriteBatchSize()); 289 this.pageFile.setPageCacheSize(getIndexCacheSize()); 290 this.pageFile.load(); 291 292 this.pageFile.tx().execute(new Transaction.Closure<IOException>() { 293 public void execute(Transaction tx) throws IOException { 294 if (pageFile.getPageCount() == 0) { 295 Page<MetaData> page = tx.allocate(); 296 assert page.getPageId() == 0; 297 page.set(metaData); 298 metaData.page = page; 299 metaData.createIndexes(tx); 300 tx.store(metaData.page, metaDataMarshaller, true); 301 302 } else { 303 Page<MetaData> page = tx.load(0, metaDataMarshaller); 304 metaData = page.get(); 305 metaData.page = page; 306 } 307 metaData.load(tx); 308 metaData.loadLists(tx, persistentLists); 309 } 310 }); 311 this.pageFile.flush(); 312 313 if (cleanupInterval > 0) { 314 if (scheduler == null) { 315 scheduler = new Scheduler(PListStoreImpl.class.getSimpleName()); 316 scheduler.start(); 317 } 318 scheduler.executePeriodically(this, cleanupInterval); 319 } 320 this.initialized = true; 321 LOG.info(this + " initialized"); 322 } 323 } 324 } 325 326 @Override 327 protected synchronized void doStart() throws Exception { 328 if (!lazyInit) { 329 intialize(); 330 } 331 LOG.info(this + " started"); 332 } 333 334 @Override 335 protected synchronized void doStop(ServiceStopper stopper) throws Exception { 336 if (scheduler != null) { 337 if (PListStoreImpl.class.getSimpleName().equals(scheduler.getName())) { 338 scheduler.stop(); 339 scheduler = null; 340 } 341 } 342 for (PListImpl pl : this.persistentLists.values()) { 343 pl.unload(null); 344 } 345 if (this.pageFile != null) { 346 this.pageFile.unload(); 347 } 348 if (this.journal != null) { 349 journal.close(); 350 } 351 if (this.lockFile != null) { 352 this.lockFile.unlock(); 353 } 354 this.lockFile = null; 355 this.initialized = false; 356 LOG.info(this + " stopped"); 357 358 } 359 360 public void run() { 361 try { 362 if (isStopping()) { 363 return; 364 } 365 final int lastJournalFileId = journal.getLastAppendLocation().getDataFileId(); 366 final Set<Integer> candidates = journal.getFileMap().keySet(); 367 LOG.trace("Full gc candidate set:" + candidates); 368 if (candidates.size() > 1) { 369 // prune current write 370 for (Iterator<Integer> iterator = candidates.iterator(); iterator.hasNext();) { 371 if (iterator.next() >= lastJournalFileId) { 372 iterator.remove(); 373 } 374 } 375 List<PListImpl> plists = null; 376 synchronized (indexLock) { 377 synchronized (this) { 378 plists = new ArrayList<PListImpl>(persistentLists.values()); 379 } 380 } 381 for (PListImpl list : plists) { 382 list.claimFileLocations(candidates); 383 if (isStopping()) { 384 return; 385 } 386 LOG.trace("Remaining gc candidate set after refs from: " + list.getName() + ":" + candidates); 387 } 388 LOG.trace("GC Candidate set:" + candidates); 389 this.journal.removeDataFiles(candidates); 390 } 391 } catch (IOException e) { 392 LOG.error("Exception on periodic cleanup: " + e, e); 393 } 394 } 395 396 ByteSequence getPayload(Location location) throws IllegalStateException, IOException { 397 ByteSequence result = null; 398 result = this.journal.read(location); 399 return result; 400 } 401 402 Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException { 403 return this.journal.write(payload, sync); 404 } 405 406 private void lock() throws IOException { 407 if (lockFile == null) { 408 File lockFileName = new File(directory, "lock"); 409 lockFile = new LockFile(lockFileName, true); 410 if (failIfDatabaseIsLocked) { 411 lockFile.lock(); 412 } else { 413 while (true) { 414 try { 415 lockFile.lock(); 416 break; 417 } catch (IOException e) { 418 LOG.info("Database " + lockFileName + " is locked... waiting " 419 + (DATABASE_LOCKED_WAIT_DELAY / 1000) 420 + " seconds for the database to be unlocked. Reason: " + e); 421 try { 422 Thread.sleep(DATABASE_LOCKED_WAIT_DELAY); 423 } catch (InterruptedException e1) { 424 } 425 } 426 } 427 } 428 } 429 } 430 431 PageFile getPageFile() { 432 this.pageFile.isLoaded(); 433 return this.pageFile; 434 } 435 436 public boolean isFailIfDatabaseIsLocked() { 437 return failIfDatabaseIsLocked; 438 } 439 440 public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) { 441 this.failIfDatabaseIsLocked = failIfDatabaseIsLocked; 442 } 443 444 public int getJournalMaxFileLength() { 445 return journalMaxFileLength; 446 } 447 448 public void setJournalMaxFileLength(int journalMaxFileLength) { 449 this.journalMaxFileLength = journalMaxFileLength; 450 } 451 452 public int getJournalMaxWriteBatchSize() { 453 return journalMaxWriteBatchSize; 454 } 455 456 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { 457 this.journalMaxWriteBatchSize = journalMaxWriteBatchSize; 458 } 459 460 public boolean isEnableIndexWriteAsync() { 461 return enableIndexWriteAsync; 462 } 463 464 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { 465 this.enableIndexWriteAsync = enableIndexWriteAsync; 466 } 467 468 public long getCleanupInterval() { 469 return cleanupInterval; 470 } 471 472 public void setCleanupInterval(long cleanupInterval) { 473 this.cleanupInterval = cleanupInterval; 474 } 475 476 public boolean isLazyInit() { 477 return lazyInit; 478 } 479 480 public void setLazyInit(boolean lazyInit) { 481 this.lazyInit = lazyInit; 482 } 483 484 @Override 485 public String toString() { 486 String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET"; 487 return "PListStore:[" + path + "]"; 488 } 489}