2

我们有一个高负载的 Apache Camel 应用程序,它利用 logback/MDC 来记录信息。我们发现一些 MDC 信息在线程上已经过时,正如 logback 文档中预先警告的那样。我发现这个 SO question 解决了这个问题:

如何将 MDC 与线程池一起使用?

我们应该如何将其应用于我们的骆驼应用程序以避免陈旧信息?如链接问题中所建议的那样,是否可以简单地将默认 ThreadPoolExecutor 全局更改为自定义变体?我看到您可以为池本身执行此操作,但没有看到执行者的任何示例。请记住,我们的应用程序非常大,并且每天处理大量订单——我希望对现有应用程序的影响尽可能小。

4

1 回答 1

3

我想通了,想发布我所做的,以防它使其他人受益。请注意我使用的是 JDK 6/camel2.13.2

  • Camel 有一个DefaultExecutorServiceManager使用DefaultThreadPoolFactory. 我将默认工厂扩展为MdcThreadPoolFactory

  • DefaultThreadPoolFactory具有生成RejectableThreadPoolExecutors 和s的方法RejectableScheduledThreadPoolExecutor。我将这两个版本都扩展为 Mdc* 版本,这些版本覆盖了execute()包装 Runnable 并在线程之间传递 MDC 信息的方法(如我原始问题中的链接所指定)。

  • 我在我的应用程序配置中创建了一个 bean 实例,该实例MdcThreadPoolFactory由 Camel 自动拾取并用于ExecutorServiceManager

MdcThreadPoolExecutor:

package com.mypackage.concurrent

import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor
import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.*;

/**
 * A SLF4J MDC-compatible {@link ThreadPoolExecutor}.
 * <p/>
 * In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate
 * logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a
 * thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately.
 * <p/>
 * Created by broda20.
 * Date: 10/29/15
 */
public class MdcThreadPoolExecutor extends RejectableThreadPoolExecutor {

    @SuppressWarnings("unchecked")
    private Map<String, Object> getContextForTask() {
        return MDC.getCopyOfContextMap();
    }

    public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    /**
     * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
     * all delegate to this.
     */
    @Override
    public void execute(Runnable command) {
        super.execute(wrap(command, getContextForTask()));
    }

    public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) {
        return new Runnable() {
            @Override
            public void run() {
                Map previous = MDC.getCopyOfContextMap();
                if (context == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(context);
                }
                try {
                    runnable.run();
                } finally {
                    if (previous == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(previous);
                    }
                }
            }
        };
    }
}

MdcScheduledThreadPoolExecutor:

package com.mypackage.concurrent

import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor
import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.*;

/**
 * A SLF4J MDC-compatible {@link ThreadPoolExecutor}.
 * <p/>
 * In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate
 * logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a
 * thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately.
 * <p/>
 * Created by broda20.
 * Date: 10/29/15
 */
public class MdcScheduledThreadPoolExecutor extends RejectableScheduledThreadPoolExecutor {

    @SuppressWarnings("unchecked")
    private Map<String, Object> getContextForTask() {
        return MDC.getCopyOfContextMap();
    }

    public MdcScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize);
    }

    public MdcScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
        super(corePoolSize, threadFactory);
    }

    public MdcScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
        super(corePoolSize, handler);
    }

    public MdcScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, threadFactory, handler);
    }

    /**
     * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
     * all delegate to this.
     */
    @Override
    public void execute(Runnable command) {
        super.execute(wrap(command, getContextForTask()));
    }

    public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) {
        return new Runnable() {
            @Override
            public void run() {
                Map previous = MDC.getCopyOfContextMap();
                if (context == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(context);
                }
                try {
                    runnable.run();
                } finally {
                    if (previous == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(previous);
                    }
                }
            }
        };
    }
}

MdcThreadPoolFactory:

package com.mypackage.concurrent

import org.apache.camel.impl.DefaultThreadPoolFactory
import org.apache.camel.spi.ThreadPoolProfile
import org.apache.camel.util.concurrent.SizedScheduledExecutorService
import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.*;

public class MdcThreadPoolFactory extends DefaultThreadPoolFactory {

    @SuppressWarnings("unchecked")
    private Map<String, Object> getContextForTask() {
        return MDC.getCopyOfContextMap();
    }


    public ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize, boolean allowCoreThreadTimeOut,
                                             RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory) throws IllegalArgumentException {

            // the core pool size must be 0 or higher
            if (corePoolSize < 0) {
               throw new IllegalArgumentException("CorePoolSize must be >= 0, was " + corePoolSize);
            }

            // validate max >= core
            if (maxPoolSize < corePoolSize) {
                throw new IllegalArgumentException("MaxPoolSize must be >= corePoolSize, was " + maxPoolSize + " >= " + corePoolSize);
            }

            BlockingQueue<Runnable> workQueue;
            if (corePoolSize == 0 && maxQueueSize <= 0) {
                // use a synchronous queue for direct-handover (no tasks stored on the queue)
                workQueue = new SynchronousQueue<Runnable>();
                // and force 1 as pool size to be able to create the thread pool by the JDK
                corePoolSize = 1;
                maxPoolSize = 1;
            } else if (maxQueueSize <= 0) {
                // use a synchronous queue for direct-handover (no tasks stored on the queue)
                workQueue = new SynchronousQueue<Runnable>();
            } else {
                // bounded task queue to store tasks on the queue
                workQueue = new LinkedBlockingQueue<Runnable>(maxQueueSize);
            }

            ThreadPoolExecutor answer = new MdcThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue);
            answer.setThreadFactory(threadFactory);
            answer.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
            if (rejectedExecutionHandler == null) {
                rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
            }
            answer.setRejectedExecutionHandler(rejectedExecutionHandler);
            return answer;
        }

        @Override
        public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) {
            RejectedExecutionHandler rejectedExecutionHandler = profile.getRejectedExecutionHandler();
            if (rejectedExecutionHandler == null) {
                rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
            }

            ScheduledThreadPoolExecutor answer = new MdcScheduledThreadPoolExecutor(profile.getPoolSize(), threadFactory, rejectedExecutionHandler);
            //JDK7: answer.setRemoveOnCancelPolicy(true);

            // need to wrap the thread pool in a sized to guard against the problem that the
            // JDK created thread pool has an unbounded queue (see class javadoc), which mean
            // we could potentially keep adding tasks, and run out of memory.
            if (profile.getMaxPoolSize() > 0) {
                return new SizedScheduledExecutorService(answer, profile.getMaxQueueSize());
            } else {
                return answer;
            }
        }
}

最后,bean 实例:

<bean id="mdcThreadPoolFactory" class="com.mypackage.concurrent.MdcThreadPoolFactory"/>
于 2015-10-29T19:13:41.500 回答