2

I have a service which talks to around 50 devices over Ethernet using TCP. These devices push data at various speeds (ranging from 50ms to 1500ms). Hence I use BlockingCollection (per device) for queuing the received data and then processing the data in a different thread.

To consume and process the data from the BlockingCollection, I used an explicit thread per device as shown below

private void ThreadProc(object o)
{
    while (true)
    {
        DeviceData data = m_blockingCollection.Take();
        ProcessData(data);
    }
}

IMHO, this is a simple and elegant solution since these threads will be blocked if there is no data available and will not consume CPU.

One alternate approach (recommended by colleagues) is a timer with an interval of 250ms. But I have a gut feeling that scheduling around 50 operations every 250 milliseconds might be costlier. For faster devices this will slow down processing and for slower devices this will cause unnecessary execution of the timer logic.

private void OnTimeOut(object o)
{
    if (m_blockingCollection.TryTake(out DeviceData data))
    {
        ProcessData(data);
    }
}

On researching about this, I also found about ThreadPool.RegisterWaitForSingleObject which looks apt to the problem. Hence I modified the code as below

AutoResetEvent m_dataEvent = new AutoResetEvent(false);
RegisteredWaitHandle reg = ThreadPool.RegisterWaitForSingleObject
                         (m_dataEvent, ConsumeAndProcess, null, -1, false);

    private void OnDeviceDataRecevied(DeviceData data)
    {
        sm_blockingCollection.Add(data);
        m_dataEvent.Set();
    }

    private static void ConsumeAndProcess(object state, bool timedOut)
    {
        if (sm_blockingCollection.TryTake(out int data))
        {
            ProcessData(data);
        }
    }

Did I think correctly? Is the 3rd approach better than the first and second? Which will be better in terms of efficiency and in terms of less resource usage?

Arctic
  • 807
  • 10
  • 22
  • 3
    Personally id use TPL Dataflow or RX for this, which gives you the ability to pipeline the data serially or asynchronously, maintain order or not, buffer, and process it in any weird and wonderful threaded / parallel way you'd like (or not) – TheGeneral Jul 29 '20 at 08:49
  • 3
    Rather than a separate thread per device, have you considered using https://learn.microsoft.com/en-us/dotnet/api/system.collections.concurrent.blockingcollection-1.takefromany?view=netcore-3.1#System_Collections_Concurrent_BlockingCollection_1_TakeFromAny_System_Collections_Concurrent_BlockingCollection__0_____0__ ? – mjwills Jul 29 '20 at 08:52
  • @TheGeneral Thank you. If you were using Rx for the same scenario, would you have observed on NewThreadScheduler or ThreadPoolScheduler? – Arctic Jul 29 '20 at 09:03
  • 1
    It is not better, that code has a very nasty threading race bug. Happens when OnDeviceDataRecevied() runs twice, before ConsumeAndProcess() could respond to the first Set(). A very likely mishap with that many devices, quite hard to debug. Just don't use a thread at all, BeginRead() (old style) or ReadAsync(). – Hans Passant Jul 29 '20 at 12:06
  • @HansPassant Yeah, you are right about that, Thank you. I was initially processing (processing takes around 500ms) the data in EndRead method block. But some of those devices crashed and hence I decided to move the data to a blocking collection and process from different thread. – Arctic Jul 29 '20 at 12:38

1 Answers1

1

Going from a BlockingCollection to a Timer looks like a technological regression to me. And implementing a solution on top of AutoResetEvent looks like an attempt to re-implement a BlockingCollection, with a low probability of building a better one, and a significant probability of building a buggy one.

Having 50 threads blocked most of the time is not that terrible (each thread consumes "only" 1MB of memory), but this is a low scalability setup. Fortunately now there are build-in tools available that allow to upgrade from a blocking collection to an async collection, and have the advantages of the BlockingCollection (responsiveness, low CPU utilization) without the disadvantages (blocked threads). For example using Channels you could go from 50 devices to 5,000 devices, using possibly a smaller number of threads than you are currently using (shared ThreadPool threads instead of dedicated threads).

private Channel<DeviceData> m_channel = Channel.CreateUnbounded<DeviceData>();

private async Task DeviceProcessorAsync()
{
    while (await m_channel.Reader.WaitToReadAsync())
    {
        while (m_channel.Reader.TryRead(out DeviceData data))
        {
            ProcessData(data);
        }
    }
}

The Channels are built-in the .NET Core, and available as a package for .NET Framework.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • 1
    Thank you. I might go with first approach (thread based) for the time and will definitely plan to upgrade and try your solution. – Arctic Jul 29 '20 at 13:10