java.util.concurrent.RejectedExecutionException at org.apache.camel.processor.MulticastProcessor

Solution Unverified - Updated -

Environment

  • Fuse Mediation Router

Issue

java.util.concurrent.RejectedExecutionException at org.apache.camel.processor.MulticastProcessor

ERROR | pool-flow.seda.servicemix-camel-thread-24 | DeadLetterChannel | Failed deliever for exchangeId: ..... On Deliviery attempt: 0 caught: java.util.concurrent.RejectedExecutionException
java.util.concurrent.RejectedExecutionException
at org.apache.camel.processor.MulticastProcessor$2.rejectedExecution(MulticastProcssor.java:247)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:767)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:658)
at org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:172)
at org.apache.camel.processor.interceptor.StreamCachingInterceptor.proceed(StreamCachingInterceptor.java:90)
at org.apache.camel.processor.interceptor.StreamCachingInterceptor.process(StreamCachingInterceptor.java:82)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:41)
at org.apache.camel.processor.interceptor.StreamCachingInterceptor.process(StreamCachingInterceptor.java:69)
at org.apache.camel.processor.DelegateProcessor.processNext(DelegateProcessor.java:50)

Resolution

There are two solutions.

1) Increase the depth of the Executors WorkQueue

ThreadPoolExecutor tpExecutor = new ThreadPoolExecutor(1, 10, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(2048));

This executor needs to be set on your camel route:

from("direct:parallel")
  .multicast(new BodyOutAggregatingStrategy(), true).executorService(tpExecutor)
  .to("direct:x", "direct:y", "direct:z");

OR ALTERNATIVELY

2) Install a RejectedExecutionHandler to re-queue the work item, for example:

// Define a handler
private class WorkQueuePolicy implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
        try {
            executor.getQueue().put(runnable);
        } catch (InterruptedException e) {
            // should not happen
            throw new RejectedExecutionException(e);
        }
    }
}
// Install the handler
ThreadPoolExecutor tpExecutor = new ThreadPoolExecutor(1, 10, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(2048));
tpExecutor.setRejectedExecutionHandler(new WorkQueuePolicy());
// Set the executor on the route
from("direct:parallel")
  .multicast(new BodyOutAggregatingStrategy(), true).executorService(tpExecutor)
  .to("direct:x", "direct:y", "direct:z");

Root Cause

This is caused when the multicast processor is processing requests in parallel. The processor has an associated ExecutorService which is backed by a WorkQueue. If the number of incoming tasks exceeds the depth of the Executor's WorkQueue you will see RejectedExecutionExceptions.

This solution is part of Red Hat’s fast-track publication program, providing a huge library of solutions that Red Hat engineers have created while supporting our customers. To give you the knowledge you need the instant it becomes available, these articles may be presented in a raw and unedited form.

Comments