001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb.scheduler; 018 019import java.io.DataInput; 020import java.io.DataOutput; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Iterator; 024import java.util.List; 025import java.util.Map; 026import java.util.concurrent.CopyOnWriteArrayList; 027import java.util.concurrent.atomic.AtomicBoolean; 028 029import javax.jms.MessageFormatException; 030 031import org.apache.activemq.broker.scheduler.CronParser; 032import org.apache.activemq.broker.scheduler.Job; 033import org.apache.activemq.broker.scheduler.JobListener; 034import org.apache.activemq.broker.scheduler.JobScheduler; 035import org.apache.activemq.protobuf.Buffer; 036import org.apache.activemq.store.kahadb.data.KahaAddScheduledJobCommand; 037import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobCommand; 038import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobsCommand; 039import org.apache.activemq.store.kahadb.data.KahaRescheduleJobCommand; 040import org.apache.activemq.store.kahadb.disk.index.BTreeIndex; 041import org.apache.activemq.store.kahadb.disk.journal.Location; 042import org.apache.activemq.store.kahadb.disk.page.Transaction; 043import org.apache.activemq.store.kahadb.disk.util.LongMarshaller; 044import org.apache.activemq.util.ByteSequence; 045import org.apache.activemq.util.IdGenerator; 046import org.apache.activemq.util.ServiceStopper; 047import org.apache.activemq.util.ServiceSupport; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler { 052 053 private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerImpl.class); 054 private final JobSchedulerStoreImpl store; 055 private final AtomicBoolean running = new AtomicBoolean(); 056 private String name; 057 private BTreeIndex<Long, List<JobLocation>> index; 058 private Thread thread; 059 private final AtomicBoolean started = new AtomicBoolean(false); 060 private final List<JobListener> jobListeners = new CopyOnWriteArrayList<JobListener>(); 061 private static final IdGenerator ID_GENERATOR = new IdGenerator(); 062 private final ScheduleTime scheduleTime = new ScheduleTime(); 063 064 JobSchedulerImpl(JobSchedulerStoreImpl store) { 065 this.store = store; 066 } 067 068 public void setName(String name) { 069 this.name = name; 070 } 071 072 @Override 073 public String getName() { 074 return this.name; 075 } 076 077 @Override 078 public void addListener(JobListener l) { 079 this.jobListeners.add(l); 080 } 081 082 @Override 083 public void removeListener(JobListener l) { 084 this.jobListeners.remove(l); 085 } 086 087 @Override 088 public void schedule(final String jobId, final ByteSequence payload, final long delay) throws IOException { 089 doSchedule(jobId, payload, "", 0, delay, 0); 090 } 091 092 @Override 093 public void schedule(final String jobId, final ByteSequence payload, final String cronEntry) throws Exception { 094 doSchedule(jobId, payload, cronEntry, 0, 0, 0); 095 } 096 097 @Override 098 public void schedule(final String jobId, final ByteSequence payload, final String cronEntry, final long delay, final long period, final int repeat) throws IOException { 099 doSchedule(jobId, payload, cronEntry, delay, period, repeat); 100 } 101 102 @Override 103 public void remove(final long time) throws IOException { 104 doRemoveRange(time, time); 105 } 106 107 @Override 108 public void remove(final String jobId) throws IOException { 109 doRemove(-1, jobId); 110 } 111 112 @Override 113 public void removeAllJobs() throws IOException { 114 doRemoveRange(0, Long.MAX_VALUE); 115 } 116 117 @Override 118 public void removeAllJobs(final long start, final long finish) throws IOException { 119 doRemoveRange(start, finish); 120 } 121 122 @Override 123 public long getNextScheduleTime() throws IOException { 124 this.store.readLockIndex(); 125 try { 126 Map.Entry<Long, List<JobLocation>> first = this.index.getFirst(this.store.getPageFile().tx()); 127 return first != null ? first.getKey() : -1l; 128 } finally { 129 this.store.readUnlockIndex(); 130 } 131 } 132 133 @Override 134 public List<Job> getNextScheduleJobs() throws IOException { 135 final List<Job> result = new ArrayList<Job>(); 136 this.store.readLockIndex(); 137 try { 138 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 139 @Override 140 public void execute(Transaction tx) throws IOException { 141 Map.Entry<Long, List<JobLocation>> first = index.getFirst(tx); 142 if (first != null) { 143 for (JobLocation jl : first.getValue()) { 144 ByteSequence bs = getPayload(jl.getLocation()); 145 Job job = new JobImpl(jl, bs); 146 result.add(job); 147 } 148 } 149 } 150 }); 151 } finally { 152 this.store.readUnlockIndex(); 153 } 154 return result; 155 } 156 157 private Map.Entry<Long, List<JobLocation>> getNextToSchedule() throws IOException { 158 this.store.readLockIndex(); 159 try { 160 if (!this.store.isStopped() && !this.store.isStopping()) { 161 Map.Entry<Long, List<JobLocation>> first = this.index.getFirst(this.store.getPageFile().tx()); 162 return first; 163 } 164 } finally { 165 this.store.readUnlockIndex(); 166 } 167 return null; 168 } 169 170 @Override 171 public List<Job> getAllJobs() throws IOException { 172 final List<Job> result = new ArrayList<Job>(); 173 this.store.readLockIndex(); 174 try { 175 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 176 @Override 177 public void execute(Transaction tx) throws IOException { 178 Iterator<Map.Entry<Long, List<JobLocation>>> iter = index.iterator(store.getPageFile().tx()); 179 while (iter.hasNext()) { 180 Map.Entry<Long, List<JobLocation>> next = iter.next(); 181 if (next != null) { 182 for (JobLocation jl : next.getValue()) { 183 ByteSequence bs = getPayload(jl.getLocation()); 184 Job job = new JobImpl(jl, bs); 185 result.add(job); 186 } 187 } else { 188 break; 189 } 190 } 191 } 192 }); 193 } finally { 194 this.store.readUnlockIndex(); 195 } 196 return result; 197 } 198 199 @Override 200 public List<Job> getAllJobs(final long start, final long finish) throws IOException { 201 final List<Job> result = new ArrayList<Job>(); 202 this.store.readLockIndex(); 203 try { 204 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 205 @Override 206 public void execute(Transaction tx) throws IOException { 207 Iterator<Map.Entry<Long, List<JobLocation>>> iter = index.iterator(tx, start); 208 while (iter.hasNext()) { 209 Map.Entry<Long, List<JobLocation>> next = iter.next(); 210 if (next != null && next.getKey().longValue() <= finish) { 211 for (JobLocation jl : next.getValue()) { 212 ByteSequence bs = getPayload(jl.getLocation()); 213 Job job = new JobImpl(jl, bs); 214 result.add(job); 215 } 216 } else { 217 break; 218 } 219 } 220 } 221 }); 222 } finally { 223 this.store.readUnlockIndex(); 224 } 225 return result; 226 } 227 228 private void doSchedule(final String jobId, final ByteSequence payload, final String cronEntry, long delay, long period, int repeat) throws IOException { 229 long startTime = System.currentTimeMillis(); 230 // round startTime - so we can schedule more jobs 231 // at the same time 232 startTime = (startTime / 1000) * 1000; 233 long time = 0; 234 if (cronEntry != null && cronEntry.length() > 0) { 235 try { 236 time = CronParser.getNextScheduledTime(cronEntry, startTime); 237 } catch (MessageFormatException e) { 238 throw new IOException(e.getMessage()); 239 } 240 } 241 242 if (time == 0) { 243 // start time not set by CRON - so it it to the current time 244 time = startTime; 245 } 246 247 if (delay > 0) { 248 time += delay; 249 } else { 250 time += period; 251 } 252 253 KahaAddScheduledJobCommand newJob = new KahaAddScheduledJobCommand(); 254 newJob.setScheduler(name); 255 newJob.setJobId(jobId); 256 newJob.setStartTime(startTime); 257 newJob.setCronEntry(cronEntry); 258 newJob.setDelay(delay); 259 newJob.setPeriod(period); 260 newJob.setRepeat(repeat); 261 newJob.setNextExecutionTime(time); 262 newJob.setPayload(new Buffer(payload.getData(), payload.getOffset(), payload.getLength())); 263 264 this.store.store(newJob); 265 } 266 267 private void doReschedule(final String jobId, long executionTime, long nextExecutionTime, int rescheduledCount) throws IOException { 268 KahaRescheduleJobCommand update = new KahaRescheduleJobCommand(); 269 update.setScheduler(name); 270 update.setJobId(jobId); 271 update.setExecutionTime(executionTime); 272 update.setNextExecutionTime(nextExecutionTime); 273 update.setRescheduledCount(rescheduledCount); 274 this.store.store(update); 275 } 276 277 private void doRemove(final long executionTime, final List<JobLocation> jobs) throws IOException { 278 for (JobLocation job : jobs) { 279 doRemove(executionTime, job.getJobId()); 280 } 281 } 282 283 private void doRemove(long executionTime, final String jobId) throws IOException { 284 KahaRemoveScheduledJobCommand remove = new KahaRemoveScheduledJobCommand(); 285 remove.setScheduler(name); 286 remove.setJobId(jobId); 287 remove.setNextExecutionTime(executionTime); 288 this.store.store(remove); 289 } 290 291 private void doRemoveRange(long start, long end) throws IOException { 292 KahaRemoveScheduledJobsCommand destroy = new KahaRemoveScheduledJobsCommand(); 293 destroy.setScheduler(name); 294 destroy.setStartTime(start); 295 destroy.setEndTime(end); 296 this.store.store(destroy); 297 } 298 299 /** 300 * Adds a new Scheduled job to the index. Must be called under index lock. 301 * 302 * This method must ensure that a duplicate add is not processed into the scheduler. On index 303 * recover some adds may be replayed and we don't allow more than one instance of a JobId to 304 * exist at any given scheduled time, so filter these out to ensure idempotence. 305 * 306 * @param tx 307 * Transaction in which the update is performed. 308 * @param command 309 * The new scheduled job command to process. 310 * @param location 311 * The location where the add command is stored in the journal. 312 * 313 * @throws IOException if an error occurs updating the index. 314 */ 315 protected void process(final Transaction tx, final KahaAddScheduledJobCommand command, Location location) throws IOException { 316 JobLocation jobLocation = new JobLocation(location); 317 jobLocation.setJobId(command.getJobId()); 318 jobLocation.setStartTime(command.getStartTime()); 319 jobLocation.setCronEntry(command.getCronEntry()); 320 jobLocation.setDelay(command.getDelay()); 321 jobLocation.setPeriod(command.getPeriod()); 322 jobLocation.setRepeat(command.getRepeat()); 323 324 long nextExecutionTime = command.getNextExecutionTime(); 325 326 List<JobLocation> values = null; 327 jobLocation.setNextTime(nextExecutionTime); 328 if (this.index.containsKey(tx, nextExecutionTime)) { 329 values = this.index.remove(tx, nextExecutionTime); 330 } 331 if (values == null) { 332 values = new ArrayList<JobLocation>(); 333 } 334 335 // There can never be more than one instance of the same JobId scheduled at any 336 // given time, when it happens its probably the result of index recovery and this 337 // method must be idempotent so check for it first. 338 if (!values.contains(jobLocation)) { 339 values.add(jobLocation); 340 341 // Reference the log file where the add command is stored to prevent GC. 342 this.store.incrementJournalCount(tx, location); 343 this.index.put(tx, nextExecutionTime, values); 344 this.scheduleTime.newJob(); 345 } else { 346 this.index.put(tx, nextExecutionTime, values); 347 LOG.trace("Job {} already in scheduler at this time {}", 348 jobLocation.getJobId(), jobLocation.getNextTime()); 349 } 350 } 351 352 /** 353 * Reschedules a Job after it has be fired. 354 * 355 * For jobs that are repeating this method updates the job in the index by adding it to the 356 * jobs list for the new execution time. If the job is not a cron type job then this method 357 * will reduce the repeat counter if the job has a fixed number of repeats set. The Job will 358 * be removed from the jobs list it just executed on. 359 * 360 * This method must also update the value of the last update location in the JobLocation 361 * instance so that the checkpoint worker doesn't drop the log file in which that command lives. 362 * 363 * This method must ensure that an reschedule command that references a job that doesn't exist 364 * does not cause an error since it's possible that on recover the original add might be gone 365 * and so the job should not reappear in the scheduler. 366 * 367 * @param tx 368 * The TX under which the index is updated. 369 * @param command 370 * The reschedule command to process. 371 * @param location 372 * The location in the index where the reschedule command was stored. 373 * 374 * @throws IOException if an error occurs during the reschedule. 375 */ 376 protected void process(final Transaction tx, final KahaRescheduleJobCommand command, Location location) throws IOException { 377 JobLocation result = null; 378 final List<JobLocation> current = this.index.remove(tx, command.getExecutionTime()); 379 if (current != null) { 380 for (int i = 0; i < current.size(); i++) { 381 JobLocation jl = current.get(i); 382 if (jl.getJobId().equals(command.getJobId())) { 383 current.remove(i); 384 if (!current.isEmpty()) { 385 this.index.put(tx, command.getExecutionTime(), current); 386 } 387 result = jl; 388 break; 389 } 390 } 391 } else { 392 LOG.debug("Process reschedule command for job {} non-existent executime time {}.", 393 command.getJobId(), command.getExecutionTime()); 394 } 395 396 if (result != null) { 397 Location previousUpdate = result.getLastUpdate(); 398 399 List<JobLocation> target = null; 400 result.setNextTime(command.getNextExecutionTime()); 401 result.setLastUpdate(location); 402 result.setRescheduledCount(command.getRescheduledCount()); 403 if (!result.isCron() && result.getRepeat() > 0) { 404 result.setRepeat(result.getRepeat() - 1); 405 } 406 if (this.index.containsKey(tx, command.getNextExecutionTime())) { 407 target = this.index.remove(tx, command.getNextExecutionTime()); 408 } 409 if (target == null) { 410 target = new ArrayList<JobLocation>(); 411 } 412 target.add(result); 413 414 // Track the location of the last reschedule command and release the log file 415 // reference for the previous one if there was one. 416 this.store.incrementJournalCount(tx, location); 417 if (previousUpdate != null) { 418 this.store.decrementJournalCount(tx, previousUpdate); 419 } 420 421 this.index.put(tx, command.getNextExecutionTime(), target); 422 this.scheduleTime.newJob(); 423 } else { 424 LOG.debug("Process reschedule command for non-scheduled job {} at executime time {}.", 425 command.getJobId(), command.getExecutionTime()); 426 } 427 } 428 429 /** 430 * Removes a scheduled job from the scheduler. 431 * 432 * The remove operation can be of two forms. The first is that there is a job Id but no set time 433 * (-1) in which case the jobs index is searched until the target job Id is located. The alternate 434 * form is that a job Id and execution time are both set in which case the given time is checked 435 * for a job matching that Id. In either case once an execution time is identified the job is 436 * removed and the index updated. 437 * 438 * This method should ensure that if the matching job is not found that no error results as it 439 * is possible that on a recover the initial add command could be lost so the job may not be 440 * rescheduled. 441 * 442 * @param tx 443 * The transaction under which the index is updated. 444 * @param command 445 * The remove command to process. 446 * @param location 447 * The location of the remove command in the Journal. 448 * 449 * @throws IOException if an error occurs while updating the scheduler index. 450 */ 451 void process(final Transaction tx, final KahaRemoveScheduledJobCommand command, Location location) throws IOException { 452 453 // Case 1: JobId and no time value means find the job and remove it. 454 // Case 2: JobId and a time value means find exactly this scheduled job. 455 456 Long executionTime = command.getNextExecutionTime(); 457 458 List<JobLocation> values = null; 459 460 if (executionTime == -1) { 461 for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) { 462 Map.Entry<Long, List<JobLocation>> entry = i.next(); 463 List<JobLocation> candidates = entry.getValue(); 464 if (candidates != null) { 465 for (JobLocation jl : candidates) { 466 if (jl.getJobId().equals(command.getJobId())) { 467 LOG.trace("Entry {} contains the remove target: {}", entry.getKey(), command.getJobId()); 468 executionTime = entry.getKey(); 469 values = this.index.remove(tx, executionTime); 470 break; 471 } 472 } 473 } 474 } 475 } else { 476 values = this.index.remove(tx, executionTime); 477 } 478 479 JobLocation removed = null; 480 481 // Remove the job and update the index if there are any other jobs scheduled at this time. 482 if (values != null) { 483 for (JobLocation job : values) { 484 if (job.getJobId().equals(command.getJobId())) { 485 removed = job; 486 values.remove(removed); 487 break; 488 } 489 } 490 491 if (!values.isEmpty()) { 492 this.index.put(tx, executionTime, values); 493 } 494 } 495 496 if (removed != null) { 497 LOG.trace("{} removed from scheduler {}", removed, this); 498 499 // Remove the references for add and reschedule commands for this job 500 // so that those logs can be GC'd when free. 501 this.store.decrementJournalCount(tx, removed.getLocation()); 502 if (removed.getLastUpdate() != null) { 503 this.store.decrementJournalCount(tx, removed.getLastUpdate()); 504 } 505 506 // now that the job is removed from the index we can store the remove info and 507 // then dereference the log files that hold the initial add command and the most 508 // recent update command. 509 this.store.referenceRemovedLocation(tx, location, removed); 510 } 511 } 512 513 /** 514 * Removes all scheduled jobs within a given time range. 515 * 516 * The method can be used to clear the entire scheduler index by specifying a range that 517 * encompasses all time [0...Long.MAX_VALUE] or a single execution time can be removed by 518 * setting start and end time to the same value. 519 * 520 * @param tx 521 * The transaction under which the index is updated. 522 * @param command 523 * The remove command to process. 524 * @param location 525 * The location of the remove command in the Journal. 526 * 527 * @throws IOException if an error occurs while updating the scheduler index. 528 */ 529 protected void process(final Transaction tx, final KahaRemoveScheduledJobsCommand command, Location location) throws IOException { 530 removeInRange(tx, command.getStartTime(), command.getEndTime(), location); 531 } 532 533 /** 534 * Removes all jobs from the schedulers index. Must be called with the index locked. 535 * 536 * @param tx 537 * The transaction under which the index entries for this scheduler are removed. 538 * 539 * @throws IOException if an error occurs removing the jobs from the scheduler index. 540 */ 541 protected void removeAll(Transaction tx) throws IOException { 542 this.removeInRange(tx, 0, Long.MAX_VALUE, null); 543 } 544 545 /** 546 * Removes all scheduled jobs within the target range. 547 * 548 * This method can be used to remove all the stored jobs by passing a range of [0...Long.MAX_VALUE] 549 * or it can be used to remove all jobs at a given scheduled time by passing the same time value 550 * for both start and end. If the optional location parameter is set then this method will update 551 * the store's remove location tracker with the location value and the Jobs that are being removed. 552 * 553 * This method must be called with the store index locked for writes. 554 * 555 * @param tx 556 * The transaction under which the index is to be updated. 557 * @param start 558 * The start time for the remove operation. 559 * @param finish 560 * The end time for the remove operation. 561 * @param location (optional) 562 * The location of the remove command that triggered this remove. 563 * 564 * @throws IOException if an error occurs during the remove operation. 565 */ 566 protected void removeInRange(Transaction tx, long start, long finish, Location location) throws IOException { 567 List<Long> keys = new ArrayList<Long>(); 568 for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx, start); i.hasNext();) { 569 Map.Entry<Long, List<JobLocation>> entry = i.next(); 570 if (entry.getKey().longValue() <= finish) { 571 keys.add(entry.getKey()); 572 } else { 573 break; 574 } 575 } 576 577 for (Long executionTime : keys) { 578 List<JobLocation> values = this.index.remove(tx, executionTime); 579 if (location != null) { 580 for (JobLocation job : values) { 581 LOG.trace("Removing {} scheduled at: {}", job, executionTime); 582 583 // Remove the references for add and reschedule commands for this job 584 // so that those logs can be GC'd when free. 585 this.store.decrementJournalCount(tx, job.getLocation()); 586 if (job.getLastUpdate() != null) { 587 this.store.decrementJournalCount(tx, job.getLastUpdate()); 588 } 589 590 // now that the job is removed from the index we can store the remove info and 591 // then dereference the log files that hold the initial add command and the most 592 // recent update command. 593 this.store.referenceRemovedLocation(tx, location, job); 594 } 595 } 596 } 597 } 598 599 /** 600 * Removes a Job from the index using it's Id value and the time it is currently set to 601 * be executed. This method will only remove the Job if it is found at the given execution 602 * time. 603 * 604 * This method must be called under index lock. 605 * 606 * @param tx 607 * the transaction under which this method is being executed. 608 * @param jobId 609 * the target Job Id to remove. 610 * @param executionTime 611 * the scheduled time that for the Job Id that is being removed. 612 * 613 * @returns true if the Job was removed or false if not found at the given time. 614 * 615 * @throws IOException if an error occurs while removing the Job. 616 */ 617 protected boolean removeJobAtTime(Transaction tx, String jobId, long executionTime) throws IOException { 618 boolean result = false; 619 620 List<JobLocation> jobs = this.index.remove(tx, executionTime); 621 Iterator<JobLocation> jobsIter = jobs.iterator(); 622 while (jobsIter.hasNext()) { 623 JobLocation job = jobsIter.next(); 624 if (job.getJobId().equals(jobId)) { 625 jobsIter.remove(); 626 // Remove the references for add and reschedule commands for this job 627 // so that those logs can be GC'd when free. 628 this.store.decrementJournalCount(tx, job.getLocation()); 629 if (job.getLastUpdate() != null) { 630 this.store.decrementJournalCount(tx, job.getLastUpdate()); 631 } 632 result = true; 633 break; 634 } 635 } 636 637 // Return the list to the index modified or unmodified. 638 this.index.put(tx, executionTime, jobs); 639 640 return result; 641 } 642 643 /** 644 * Walks the Scheduled Job Tree and collects the add location and last update location 645 * for all scheduled jobs. 646 * 647 * This method must be called with the index locked. 648 * 649 * @param tx 650 * the transaction under which this operation was invoked. 651 * 652 * @return a list of all referenced Location values for this JobSchedulerImpl 653 * 654 * @throws IOException if an error occurs walking the scheduler tree. 655 */ 656 protected List<JobLocation> getAllScheduledJobs(Transaction tx) throws IOException { 657 List<JobLocation> references = new ArrayList<JobLocation>(); 658 659 for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) { 660 Map.Entry<Long, List<JobLocation>> entry = i.next(); 661 List<JobLocation> scheduled = entry.getValue(); 662 for (JobLocation job : scheduled) { 663 references.add(job); 664 } 665 } 666 667 return references; 668 } 669 670 @Override 671 public void run() { 672 try { 673 mainLoop(); 674 } catch (Throwable e) { 675 if (this.running.get() && isStarted()) { 676 LOG.error("{} Caught exception in mainloop", this, e); 677 } 678 } finally { 679 if (running.get()) { 680 try { 681 stop(); 682 } catch (Exception e) { 683 LOG.error("Failed to stop {}", this); 684 } 685 } 686 } 687 } 688 689 @Override 690 public String toString() { 691 return "JobScheduler: " + this.name; 692 } 693 694 protected void mainLoop() { 695 while (this.running.get()) { 696 this.scheduleTime.clearNewJob(); 697 try { 698 long currentTime = System.currentTimeMillis(); 699 700 // Read the list of scheduled events and fire the jobs, reschedule repeating jobs as 701 // needed before firing the job event. 702 Map.Entry<Long, List<JobLocation>> first = getNextToSchedule(); 703 if (first != null) { 704 List<JobLocation> list = new ArrayList<JobLocation>(first.getValue()); 705 List<JobLocation> toRemove = new ArrayList<JobLocation>(list.size()); 706 final long executionTime = first.getKey(); 707 long nextExecutionTime = 0; 708 if (executionTime <= currentTime) { 709 for (final JobLocation job : list) { 710 711 if (!running.get()) { 712 break; 713 } 714 715 int repeat = job.getRepeat(); 716 nextExecutionTime = calculateNextExecutionTime(job, currentTime, repeat); 717 long waitTime = nextExecutionTime - currentTime; 718 this.scheduleTime.setWaitTime(waitTime); 719 if (!job.isCron()) { 720 fireJob(job); 721 if (repeat != 0) { 722 // Reschedule for the next time, the scheduler will take care of 723 // updating the repeat counter on the update. 724 doReschedule(job.getJobId(), executionTime, nextExecutionTime, job.getRescheduledCount() + 1); 725 } else { 726 toRemove.add(job); 727 } 728 } else { 729 if (repeat == 0) { 730 // This is a non-repeating Cron entry so we can fire and forget it. 731 fireJob(job); 732 } 733 734 if (nextExecutionTime > currentTime) { 735 // Reschedule the cron job as a new event, if the cron entry signals 736 // a repeat then it will be stored separately and fired as a normal 737 // event with decrementing repeat. 738 doReschedule(job.getJobId(), executionTime, nextExecutionTime, job.getRescheduledCount() + 1); 739 740 if (repeat != 0) { 741 // we have a separate schedule to run at this time 742 // so the cron job is used to set of a separate schedule 743 // hence we won't fire the original cron job to the 744 // listeners but we do need to start a separate schedule 745 String jobId = ID_GENERATOR.generateId(); 746 ByteSequence payload = getPayload(job.getLocation()); 747 schedule(jobId, payload, "", job.getDelay(), job.getPeriod(), job.getRepeat()); 748 waitTime = job.getDelay() != 0 ? job.getDelay() : job.getPeriod(); 749 this.scheduleTime.setWaitTime(waitTime); 750 } 751 } else { 752 toRemove.add(job); 753 } 754 } 755 } 756 757 // now remove all jobs that have not been rescheduled from this execution 758 // time, if there are no more entries in that time it will be removed. 759 doRemove(executionTime, toRemove); 760 761 // If there is a job that should fire before the currently set wait time 762 // we need to reset wait time otherwise we'll miss it. 763 Map.Entry<Long, List<JobLocation>> nextUp = getNextToSchedule(); 764 if (nextUp != null) { 765 final long timeUntilNextScheduled = nextUp.getKey() - currentTime; 766 if (timeUntilNextScheduled < this.scheduleTime.getWaitTime()) { 767 this.scheduleTime.setWaitTime(timeUntilNextScheduled); 768 } 769 } 770 } else { 771 this.scheduleTime.setWaitTime(executionTime - currentTime); 772 } 773 } 774 775 this.scheduleTime.pause(); 776 } catch (Exception ioe) { 777 LOG.error("{} Failed to schedule job", this.name, ioe); 778 try { 779 this.store.stop(); 780 } catch (Exception e) { 781 LOG.error("{} Failed to shutdown JobSchedulerStore", this.name, e); 782 } 783 } 784 } 785 } 786 787 void fireJob(JobLocation job) throws IllegalStateException, IOException { 788 LOG.debug("Firing: {}", job); 789 ByteSequence bs = this.store.getPayload(job.getLocation()); 790 for (JobListener l : jobListeners) { 791 l.scheduledJob(job.getJobId(), bs); 792 } 793 } 794 795 @Override 796 public void startDispatching() throws Exception { 797 if (!this.running.get()) { 798 return; 799 } 800 801 if (started.compareAndSet(false, true)) { 802 this.thread = new Thread(this, "JobScheduler:" + this.name); 803 this.thread.setDaemon(true); 804 this.thread.start(); 805 } 806 } 807 808 @Override 809 public void stopDispatching() throws Exception { 810 if (started.compareAndSet(true, false)) { 811 this.scheduleTime.wakeup(); 812 Thread t = this.thread; 813 this.thread = null; 814 if (t != null) { 815 t.join(3000); 816 } 817 } 818 } 819 820 @Override 821 protected void doStart() throws Exception { 822 this.running.set(true); 823 } 824 825 @Override 826 protected void doStop(ServiceStopper stopper) throws Exception { 827 this.running.set(false); 828 stopDispatching(); 829 } 830 831 private ByteSequence getPayload(Location location) throws IllegalStateException, IOException { 832 return this.store.getPayload(location); 833 } 834 835 long calculateNextExecutionTime(final JobLocation job, long currentTime, int repeat) throws MessageFormatException { 836 long result = currentTime; 837 String cron = job.getCronEntry(); 838 if (cron != null && cron.length() > 0) { 839 result = CronParser.getNextScheduledTime(cron, result); 840 } else if (job.getRepeat() != 0) { 841 result += job.getPeriod(); 842 } 843 return result; 844 } 845 846 void createIndexes(Transaction tx) throws IOException { 847 this.index = new BTreeIndex<Long, List<JobLocation>>(this.store.getPageFile(), tx.allocate().getPageId()); 848 } 849 850 void load(Transaction tx) throws IOException { 851 this.index.setKeyMarshaller(LongMarshaller.INSTANCE); 852 this.index.setValueMarshaller(JobLocationsMarshaller.INSTANCE); 853 this.index.load(tx); 854 } 855 856 void read(DataInput in) throws IOException { 857 this.name = in.readUTF(); 858 this.index = new BTreeIndex<Long, List<JobLocation>>(this.store.getPageFile(), in.readLong()); 859 this.index.setKeyMarshaller(LongMarshaller.INSTANCE); 860 this.index.setValueMarshaller(JobLocationsMarshaller.INSTANCE); 861 } 862 863 public void write(DataOutput out) throws IOException { 864 out.writeUTF(name); 865 out.writeLong(this.index.getPageId()); 866 } 867 868 static class ScheduleTime { 869 private final int DEFAULT_WAIT = 500; 870 private final int DEFAULT_NEW_JOB_WAIT = 100; 871 private boolean newJob; 872 private long waitTime = DEFAULT_WAIT; 873 private final Object mutex = new Object(); 874 875 /** 876 * @return the waitTime 877 */ 878 long getWaitTime() { 879 return this.waitTime; 880 } 881 882 /** 883 * @param waitTime 884 * the waitTime to set 885 */ 886 void setWaitTime(long waitTime) { 887 if (!this.newJob) { 888 this.waitTime = waitTime > 0 ? waitTime : DEFAULT_WAIT; 889 } 890 } 891 892 void pause() { 893 synchronized (mutex) { 894 try { 895 mutex.wait(this.waitTime); 896 } catch (InterruptedException e) { 897 } 898 } 899 } 900 901 void newJob() { 902 this.newJob = true; 903 this.waitTime = DEFAULT_NEW_JOB_WAIT; 904 wakeup(); 905 } 906 907 void clearNewJob() { 908 this.newJob = false; 909 } 910 911 void wakeup() { 912 synchronized (this.mutex) { 913 mutex.notifyAll(); 914 } 915 } 916 } 917}