3

我有一个 JSON 格式的 OData 响应以几 MB 为单位,要求是流式传输“JSON 的某些部分”,甚至无需将它们加载到内存中。

例如:当我value[0].Body.Content在下面的 JSON 中读取属性“”(将以 MB 为单位)时,我想将此值部分流式传输,而不将其反序列化为字符串类型的对象。所以基本上将值部分读入一个固定大小的字节数组并将该字节数组写入目标流(重复该步骤直到该数据完成处理)。

JSON:

{
    "@odata.context": "https://localhost:5555/api/v2.0/$metadata#Me/Messages",
    "value": [
        {
            "@odata.id": "https://localhost:5555/api/v2.0/",
            "@odata.etag": "W/\"Something\"",
            "Id": "vccvJHDSFds43hwy98fh",
            "CreatedDateTime": "2018-12-01T01:47:53Z",
            "LastModifiedDateTime": "2018-12-01T01:47:53Z",
            "ChangeKey": "SDgf43tsdf",
            "WebLink": "https://localhost:5555/?ItemID=dfsgsdfg9876ijhrf",
            "Body": {
                "ContentType": "HTML",
                "Content": "<html>\r\n<body>Huge Data Here\r\n</body>\r\n</html>\r\n"
            },
            "ToRecipients": [{
                    "EmailAddress": {
                        "Name": "ME",
                        "Address": "me@me.com"
                    }
                }
            ],
            "CcRecipients": [],
            "BccRecipients": [],
            "ReplyTo": [],
            "Flag": {
                "FlagStatus": "NotFlagged"
            }
        }
    ],
    "@odata.nextLink": "http://localhost:5555/rest/jersey/sleep?%24filter=LastDeliveredDateTime+ge+2018-12-01+and+LastDeliveredDateTime+lt+2018-12-02&%24top=50&%24skip=50"
}

尝试的方法:
1. Newtonsoft

我最初尝试使用 Newtonsoft 流,但它在内部将数据转换为字符串并加载到内存中。(这会导致 LOH 突然上升,并且在压缩发生之前内存不会被释放 - 我们的工作进程有内存限制,无法将其保存在内存中)

**code:**

    using (var jsonTextReader = new JsonTextReader(sr))
    {
        var pool = new CustomArrayPool();
        // Checking if pooling will help with memory
        jsonTextReader.ArrayPool = pool;

        while (jsonTextReader.Read())
        {
            if (jsonTextReader.TokenType == JsonToken.PropertyName
                && ((string)jsonTextReader.Value).Equals("value"))
            {
                jsonTextReader.Read();

                if (jsonTextReader.TokenType == JsonToken.StartArray)
                {
                    while (jsonTextReader.Read())
                    {
                        if (jsonTextReader.TokenType == JsonToken.StartObject)
                        {
                            var Current = JToken.Load(jsonTextReader);
                            // By Now, the LOH Shoots up.
                            // Avoid below code of converting this JToken back to byte array.
                            destinationStream.write(Encoding.ASCII.GetBytes(Current.ToString()));
                        }
                        else if (jsonTextReader.TokenType == JsonToken.EndArray)
                        {
                            break;
                        }
                    }
                }
            }

            if (jsonTextReader.TokenType == JsonToken.StartObject)
            {
                var Current = JToken.Load(jsonTextReader);
                // Do some processing with Current
                destinationStream.write(Encoding.ASCII.GetBytes(Current.ToString()));
            }
        }
    }
  1. OData.Net:

    我在想这是否可以使用 OData.Net 库,因为它看起来支持字符串字段的流式传输。但不能走得太远,因为我最终为数据创建了一个模型,这意味着该值将被转换为 MB 的一个字符串对象。

    代码

    ODataMessageReaderSettings settings = new ODataMessageReaderSettings();
    IODataResponseMessage responseMessage = new InMemoryMessage { Stream = stream };
    responseMessage.SetHeader("Content-Type", "application/json;odata.metadata=minimal;");
    // ODataMessageReader reader = new ODataMessageReader((IODataResponseMessage)message, settings, GetEdmModel());
    ODataMessageReader reader = new ODataMessageReader(responseMessage, settings, new EdmModel());
    var oDataResourceReader = reader.CreateODataResourceReader();
    var property = reader.ReadProperty();
    


知道如何使用 OData.Net/Newtonsoft 和某些字段的流值部分解析这个 JSON 吗?
这样做的唯一方法是手动解析流吗?

4

1 回答 1

3

如果您将 JSON 的一部分从一个流复制到另一个流,则可以更有效地执行此操作,JsonWriter.WriteToken(JsonReader)从而避免中间Current = JToken.Load(jsonTextReader)Encoding.ASCII.GetBytes(Current.ToString())表示及其相关的内存开销:

using (var textWriter = new StreamWriter(destinationStream, new UTF8Encoding(false, true), 1024, true))
using (var jsonWriter = new JsonTextWriter(textWriter) { Formatting = Formatting.Indented, CloseOutput = false })
{
    // Use Formatting.Indented or Formatting.None as required.
    jsonWriter.WriteToken(jsonTextReader);
}

但是,Json.NETJsonTextReader无法以与 .NET 相同的方式读取“块”中的单个字符串值XmlReader.ReadValueChunk()。它将始终完全实现每个原子字符串值。如果你的字符串值太大以至于它们在大对象堆上,即使使用JsonWriter.WriteToken()也不会阻止这些字符串完全加载到内存中。

作为替代方案,您可以考虑由JsonReaderWriterFactory. 这些读取器和写入器在 JSON 被读取写入DataContractJsonSerializer时被即时使用并将其转换为 XML 。由于这些读取器和写入器的基类是and ,因此它们确实支持以块的形式读取和写入字符串值。适当地使用它们将避免在大对象堆中分配字符串。XmlReaderXmlWriter

为此,首先定义以下扩展方法,将选定的 JSON 值子集从输入流复制到输出流,由要流式传输的数据的路径指定:

public static class JsonExtensions
{
    public static void StreamNested(Stream from, Stream to, string [] path)
    {
        var reversed = path.Reverse().ToArray();

        using (var xr = JsonReaderWriterFactory.CreateJsonReader(from, XmlDictionaryReaderQuotas.Max))
        {
            foreach (var subReader in xr.ReadSubtrees(s => s.Select(n => n.LocalName).SequenceEqual(reversed)))
            {
                using (var xw = JsonReaderWriterFactory.CreateJsonWriter(to, Encoding.UTF8, false))
                {
                    subReader.MoveToContent();

                    xw.WriteStartElement("root");
                    xw.WriteAttributes(subReader, true);

                    subReader.Read();

                    while (!subReader.EOF)
                    {
                        if (subReader.NodeType == XmlNodeType.Element && subReader.Depth == 1)
                            xw.WriteNode(subReader, true);
                        else
                            subReader.Read();
                    }

                    xw.WriteEndElement();
                }
            }
        }
    }
}

public static class XmlReaderExtensions
{
    public static IEnumerable<XmlReader> ReadSubtrees(this XmlReader xmlReader, Predicate<Stack<XName>> filter)
    {
        Stack<XName> names = new Stack<XName>();

        while (xmlReader.Read())
        {
            if (xmlReader.NodeType == XmlNodeType.Element)
            {
                names.Push(XName.Get(xmlReader.LocalName, xmlReader.NamespaceURI));
                if (filter(names))
                {
                    using (var subReader = xmlReader.ReadSubtree())
                    {
                        yield return subReader;
                    }
                }
            }

            if ((xmlReader.NodeType == XmlNodeType.Element && xmlReader.IsEmptyElement)
                || xmlReader.NodeType == XmlNodeType.EndElement)
            {
                names.Pop();
            }
        }
    }
}

现在,string [] path参数 toStreamNested()不是任何类型的路径。相反,它是对应于 XML 元素层次结构的路径,该层次结构对应于您要选择的 JSON,XmlReader返回的JsonReaderWriterFactory.CreateJsonReader(). 反过来,用于此翻译的映射由 Microsoft 在JSON 和 XML 之间的映射中记录。要仅选择和流式传输匹配的那些 JSON 值value[*],所需的 XML 路径是//root/value/item. 因此,您可以通过执行以下操作来选择和流式传输所需的嵌套对象:

JsonExtensions.StreamNested(inputStream, destinationStream, new[] { "root", "value", "item" });

笔记:

  • JSON 和 XML 之间的映射有点复杂。XDocument使用以下扩展方法将一些示例 JSON 加载到一个中通常更容易

    static XDocument ParseJsonAsXDocument(string json)
    {
        using (var xr = JsonReaderWriterFactory.CreateJsonReader(new MemoryStream(Encoding.UTF8.GetBytes(json)), Encoding.UTF8, XmlDictionaryReaderQuotas.Max, null))
        {
            return XDocument.Load(xr);
        }
    }
    

    然后通过观察确定正确的 XML 路径。

  • 有关相关问题,请参阅.NET 中的 JObject.SelectToken Equivalent

于 2019-05-08T22:28:24.820 回答