0

我必须在我公司的 CRM 解决方案(Oracle's Right Now)中查询我们的 60 万用户,如果存在则更新它们,如果不存在则创建它们。要知道用户现在是否已经存在,我使用了第三方 WS。对于 60 万用户来说,这可能是一个真正的痛苦,因为每次获得响应都需要时间(大约 1 秒)。所以我设法将我的代码更改为使用Parallel.ForEach,在 0.35 秒内查询每条记录,并将其添加List<User>到要创建或要更新的记录中(现在有点愚蠢,所以我需要将它们分成 2 个列表和调用 2 个不同的 WS 方法)。

我的代码在多线程之前可以完美运行,但耗时太长。问题是我无法使批次过大,或者当我尝试通过 Web 服务更新或创建时出现超时。所以我一次向他们发送大约 500 条记录,当它运行关键代码部分时,它会执行很多次。

Parallel.ForEach(boDS.USERS.AsEnumerable(), new ParallelOptions { MaxDegreeOfParallelism = -1 }, row =>
{
    ...
    user = null;
    user = QueryUserById(row["USER_ID"].Trim());

    if (user == null)
    {
        isUpdate = false;
        gObject.ID = new ID();
    }
    else
    {
        isUpdate = true;
        gObject.ID = user.ID;
    }

    ... fill user attributes as generic fields ...

    gObject.GenericFields = listGenericFields.ToArray();

    if (isUpdate)
        listUserUpdate.Add(gObject);
    else
        listUserCreate.Add(gObject);

    if (i == batchSize - 1 || i == (boDS.USERS.Rows.Count - 1))
    {               
        UpdateProcessingOptions upo = new UpdateProcessingOptions();
        CreateProcessingOptions cpo = new CreateProcessingOptions();
        upo.SuppressExternalEvents = false;
        upo.SuppressRules = false;
        cpo.SuppressExternalEvents = false;
        cpo.SuppressRules = false;

        RNObject[] results = null;

        // <Critical_code>

        if (listUserCreate.Count > 0)
        {
            results = _service.Create(_clientInfoHeader, listUserCreate.ToArray(), cpo);
        }
        if (listUserUpdate.Count > 0)
        {
            _service.Update(_clientInfoHeader, listUserUpdate.ToArray(), upo);
        }
        // </Critical_code>

        listUserUpdate = new List<RNObject>();
        listUserCreate = new List<RNObject>();
    }
    i++;
});

我考虑过使用lockor mutex,但这对我没有帮助,因为他们只会等待之后执行。我需要一些解决方案来仅在该部分代码的一个线程中执行一次。可能吗?任何人都可以分享一些光吗?

谢谢和亲切的问候,莱安德罗

4

2 回答 2

0

正如您在评论中所说,您在循环体之外声明变量。这就是您的比赛条件的来源。

我们以变量listUserUpdate为例。它由并行执行的线程随机访问。当一个线程仍在添加时,例如在listUserUpdate.Add(gObject);另一个线程中可能已经在重置列表listUserUpdate = new List<RNObject>();或枚举它listUserUpdate.ToArray()

您确实需要将该代码重构为

  • 通过在循环体中移动变量和
  • 使用锁和/或并发集合以同步方式访问数据
于 2014-07-23T22:34:32.197 回答
0

您可以使用双重检查锁定模式。这通常用于单例,但您不会在这里制作单例,因此Lazy<T>不适用一般的单例。

它是这样工作的:

  1. 将您的共享数据分成某种类:

    class QuerySharedData { // All the write-once-read-many fields that need to be shared between threads public QuerySharedData() { // Compute all the write-once-read-many fields. Or use a static Create method if that's handy. } }

  2. 在您的外部类中添加以下内容:

    object padlock; volatile QuerySharedData data

  3. 在线程的回调委托中,执行以下操作:

    if (data == null) { lock (padlock) { if (data == null) { data = new QuerySharedData(); // this does all the work to initialize the shared fields } } } var localData = data

然后使用来自的共享查询数据localData 通过将共享查询数据分组到一个从属类中,您避免了使其各个字段易失性的必要性。

更多关于volatile这里:第 4 部分:高级线程

在这里更新我的假设是,所有持有的类和字段在QuerySharedData初始化后都是只读的。如果这不是真的,例如,如果您初始化一个列表但在许多线程中添加到它,那么这种模式将不适合您。您将不得不考虑使用诸如Thread-Safe Collections 之类的东西。

于 2014-07-23T22:29:43.783 回答