22
Feb

Streaming pipeline and using context ResourceTracker to avoid disposed streams

Recently there’s been a few really good resources on streaming pipeline handling published. You can find some of the here and here.

The Optimizing Pipeline Performance MSDN article has two great examples of how to use some of the Microsoft.BizTalk.Streaming.dl classes. The execute method of first example looks something like below.

public IBaseMessage Execute(IPipelineContext context, IBaseMessage message)
{
    try
    {
        ...
        IBaseMessageContext messageContext = message.Context;
        if (string.IsNullOrEmpty(xPath) && string.IsNullOrEmpty(propertyValue))
        {
            throw new ArgumentException(...);
        }
        IBaseMessagePart bodyPart = message.BodyPart;
        Stream inboundStream = bodyPart.GetOriginalDataStream();
        VirtualStream virtualStream = new VirtualStream(bufferSize, thresholdSize);
        ReadOnlySeekableStream readOnlySeekableStream = new ReadOnlySeekableStream(inboundStream, virtualStream, bufferSize);
        XmlTextReader xmlTextReader = new XmlTextReader(readOnlySeekableStream);
        XPathCollection xPathCollection = new XPathCollection();
        XPathReader xPathReader = new XPathReader(xmlTextReader, xPathCollection);
        xPathCollection.Add(xPath);
        bool ok = false;
        while (xPathReader.ReadUntilMatch())
        {
            if (xPathReader.Match(0) && !ok)
            {
                propertyValue = xPathReader.ReadString();
                messageContext.Promote(propertyName, propertyNamespace, propertyValue);
                ok = true;
            }
        }
        readOnlySeekableStream.Position = 0;
        bodyPart.Data = readOnlySeekableStream;
    }
    catch (Exception ex)
    {
        if (message != null)
        {
            message.SetErrorInfo(ex);
        }
        ...
        throw ex;
    }
    return message;
}

We used this example as a base when developing something very similar in a recent project. At first every thing worked fine but after a while we stared getting an error saying:

Cannot access a disposed object. Object name: DataReader

It took us a while to figure out the real problem here, everything worked fine when sending in simple messages but as soon as we used to code in a pipeline were we also debatched messages we got the “disposed object” problem.

imageIt turns out that when we debatched messages the execute method of the custom pipeline ran multiple times, one time for each sub-messages. This forced the .NET Garbage Collector to run.

The GC found the XmlTextReader that we used to read the stream as unreferenced and decided to destoy it.

The problem is that will also dispose the readOnlySeekable-Stream stream that we connected to our message data object!

It’s then the BizTalk End Point Manager (EPM) that throws the error as it hits a disposed stream object when trying to read the message body and save it to the BizTalkMsgBox!

ResourceTracker to the rescue!

Turns out that the BizTalk message context object has a nice little class connected to it called the ResourceTracker. This object has a “AddResouce”-method that makes it possible to add an object and the context will the hold a reference to this object, this will tell the GC not to dispose it!

So when adding the below before ending the method everything works fine – even when debatching messages!

context.ResourceTracker.AddResource(xmlTextReader);

There's 6 Comments So Far

  • Alister
    June 16th, 2010 at 3:19 pm

    Hi,

    Many thanks for this post; I don’t quite understand one part of it, though. You say that when the GC runs it will call the Dispose() method on the XmlTextReader, but as far as I can see this is not the case.

    As a test, I did the following:

    Stream s = new FileStream(“input.xml”, FileMode.Open, FileAccess.Read);
    XmlTextReader xtr = new XmlTextReader(s);
    WeakReference wr = new WeakReference(xtr);
    xtr = null;
    GC.Collect();
    GC.WaitForPendingFinalizers();
    Console.WriteLine(wr.IsAlive);
    s.Seek(0, SeekOrigin.Begin);
    s.ReadByte();

    The WeakReference is just there to verify that the XmlTextReader got finalized (so IsAlive is false after the GC is run).

    Given what you say, I would expect the stream to have been disposed, and for the final Seek() to throw an exception. But in fact it works, and ReadByte() confirms that the stream is still open.

    If instead of setting the variable xtr to null I call xtr.Dispose(), then it does throw an exception. But just allowing it to go out of scope does not appear to cause that.

    So how could allowing the XmlTextReader to go out of scope cause your problem?

    Thanks,
    Alister.

  • Alister
    June 18th, 2010 at 2:14 pm

    Hello again,

    After extensive testing of a similar scenario, I believe it’s actually the VirtualStream which you should be adding to the ResourceTracker, and not the XmlTextReader.

    It’s possible that adding the XmlTextReader is working, but by accident. It would be interesting if you were to try changing it – certainly in my situation adding the VirtualStream is what fixed the problem, and this is in line with what I would expect given my understanding of how the pipeline works.

    Regards,
    Alister.

  • Tabarak
    November 11th, 2011 at 4:43 pm

    In my case, I am getting this error from Flat File Disassembler component in my Receive pipeline with Recoverable Interchange Processing set to False. If set to True, the issue is resolved. However, my requirement is to fail the entire interchange and hence would like to set it to False. Any help would be appreciated.

    Thanks!

  • Tabarak
    November 14th, 2011 at 4:08 pm

    I am using a Flat File disassembler with Recoverable Interchange set to False in my custom receive pipeline. In Decode stage, through a .NEt component, I am reading the stream and copying to another folder and also promoting few properties and creating a header level record in db. Next, in the Validate stage, again using a .NET component, I am using BizTalkMapper to transform the messages to desired schema. As mentioned, I tried adding VirtualStream to ResourceTracker here, but still I continue to get the ‘DataReader’ error. Any suggestions?

  • Richard
    November 14th, 2011 at 4:35 pm

    I’ve seen this before but haven’t had time to look into it. The common solution is to change the interchange behaviour, but that’s of course a really bad workaround …

Who Linked To This Post?

  1. Hooking into BizTalk’s underlying transaction in your custom receive pipeline component « Connected Thoughts – Thiago Almeida

Share your thoughts, leave a comment!