Safe Task Disposal When Using Asynchronous Callbacks

If you acquire or generate data using an asynchronous callback and you have SynchronizeCallbacks set to False, the callback executes on a thread other than the main thread. This document refers to the other thread as a worker thread.

Because you do not have control over switching between the main thread and the worker thread, an exception could be thrown when the worker thread tries to access the Task after the main thread has disposed the Task. To safely dispose the Task, when you do not have control over thread switching, you need a synchronization mechanism by which the main thread can wait for the worker thread to complete all callbacks before the main thread disposes the Task. Also, the main thread needs to notify the worker thread not to initiate additional read or write calls that could queue new callbacks before the worker thread completes pending callbacks.

The following sections explain how to safely dispose the Task when working with asynchronous callbacks.

Execution Path for Using ManualResetEvent to Safely Dispose Task

This diagram demonstrates the execution path for using ManualResetEvent to safely dispose a task.


1378

Using ManualResetEvent to Safely Dispose a Task

VB.NET

Imports System
Imports System.Threading
Imports NationalInstruments
Imports NationalInstruments.DAQmx

Public Class Program

    Shared Sub Main()
        Dim daqReader As SafeTaskDisposer = New SafeTaskDisposer()
        daqReader.StartAcquisition()
        Console.WriteLine("Press enter key to terminate the task...")
        Console.Read()
        daqReader.Dispose()
    End Sub 

End Class 

Public Class SafeTaskDisposer
    Implements IDisposable
    Private runningTaskWaitHandle As ManualResetEvent 
    ' Used for synchronization while disposing task 
    Private runningTask As Task
    Private continuousTask As Task
    Private analogReader As AnalogMultiChannelReader
    Private analogCallback As AsyncCallback
    Private data As AnalogWaveform(Of Double)()

    Public Sub New()
        runningTaskWaitHandle = New ManualResetEvent(True) 
        ' Instantiate wait handle 
    End Sub 

#Region "IDisposable Members" 

    ' Dispose method executes on main thread 
    Public Sub Dispose() Implements IDisposable.Dispose
        SafelyDisposeTask()
    End Sub 

#End Region 

    ' StartAcquisition method executes on main thread 
    Public Sub StartAcquisition()
        Try
            continuousTask = New Task()
            ' Configure the task and create required channels in task
            continuousTask.AIChannels.CreateVoltageChannel("dev1/ai0:3", "", AITerminalConfiguration.Differential, -10.0, 10.0, AIVoltageUnits.Volts)
            continuousTask.Timing.ConfigureSampleClock("", 1000, SampleClockActiveEdge.Rising, SampleQuantityMode.ContinuousSamples, 1000)
            continuousTask.Control(TaskAction.Verify)
            analogReader = New AnalogMultiChannelReader(continuousTask.Stream)
            analogReader.SynchronizeCallbacks = False
            analogCallback = New AsyncCallback(AddressOf AnalogReadCallback)
            ' Create a reference (runningTask) to the task running (continuousTask)
            runningTask = continuousTask
            analogReader.BeginReadWaveform(Convert.ToInt32(continuousTask.Timing.SamplesPerChannel), analogCallback, continuousTask)
        Catch ex As DaqException
            ' Exception thrown
            runningTask = Nothing
            continuousTask.Dispose()
        End Try 
    End Sub 

    ' StopAcquisition method executes on main thread 
    Public Sub StopAcquisition()
        SafelyDisposeTask()
    End Sub 

    ' AnalogReadCallback method executes on 
    '   - worker thread when SynchronizeCallback is set to false (current behavior) 
    '   - main thread when SynchronizeCallback is set to true 
    Private Sub AnalogReadCallback(ByVal ar As IAsyncResult)
        Try 
            ' Check to see whether the task is still running 
            If runningTask IsNot Nothing AndAlso ReferenceEquals(runningTask, ar.AsyncState) Then
                data = analogReader.EndReadWaveform(ar)

                ' If ProcessData is updating UI components created on the main thread, then 
                ' invoke ProcessData on the main thread when SynchronizeCallback is  
                ' set to false.
                ProcessData()

                analogReader.BeginReadWaveform(Convert.ToInt32(continuousTask.Timing.SamplesPerChannel), analogCallback, continuousTask)
            Else 
                ' Signal waiting thread to proceed
                SignalThread()
            End If 
        Catch ex As DaqException
            ' Exception thrown
            runningTask = Nothing
            continuousTask.Dispose()
            ' Signal waiting thread to proceed
            SignalThread()
        End Try 

    End Sub 

    ' ProcessData method executes on same thread on which AnalogReadCallback executes 
    Private Sub ProcessData()
        ' If this method is updating UI components created on the main thread, then invoke 
        ' the UI code or this entire method on the main thread when SynchronizeCallback 
        ' is set to false. 
    End Sub 

    ' SafelyDisposeTask method executes on main thread 
    Private Sub SafelyDisposeTask()
        If runningTask IsNot Nothing Then 
            If Not analogReader.SynchronizeCallbacks Then 
                ' When SynchronizeCallback is false, the callback will be executed on 
                ' the worker thread. In this case, wait for the pending callback to be 
                ' completed before disposing task. 


                ' Reset wait handle to wait for callback, which is on the worker thread
                runningTaskWaitHandle.Reset()

                ' Set runningTask to Nothing to indicate the callback, not to initiate  
                ' additional reads
                runningTask = Nothing 

                ' Wait for pending callback on worker thread.  
                '   - If the last operation on runningTaskWaitHandle is Set (in SignalThread),  
                '     WaitOne will not wait and proceed to execute the next line of code.  
                '   - If the last operation on runningTaskWaitHandle is Reset (in  
                '     SafelyDisposeTask), WaitOne will make the main thread wait for the  
                '     signal (Set Operation on runningTaskWaitHandle) 
                runningTaskWaitHandle.WaitOne()
            Else 
                ' When SynchronizeCallback is true, this method and the callback will  
                ' be executed on the main thread. In that case, there is no need to wait for pending  
                ' callback to be completed before disposing task, because the callack  
                ' and this method cannot execute at the same time. 


                ' Set runningTask to Nothing to indicate the callback, not to initiate  
                ' additional reads
                runningTask = Nothing 
            End If 
        End If 
        ' Safe to dispose task. 
        If continuousTask IsNot Nothing Then
            continuousTask.Dispose()
        End If 
    End Sub 

    ' SignalThread method executes on worker thread 
    Private Sub SignalThread()
        ' When SynchronizeCallback is false, the callback will be executed on the worker  
        ' thread. In that case, while disposing task, the main thread is waiting on the callback 
        ' which is executing on the worker thread to be completed. Signal (call Set  
        ' operation on runningTaskWaitHandle) the main thread to proceed 
        If Not analogReader.SynchronizeCallbacks Then
            runningTaskWaitHandle.Set()
        End If 
    End Sub 
End Class

C#

using System;
using System.Threading;
using NationalInstruments;
using NationalInstruments.DAQmx;

public class Program
{
    static void Main(string[] args)
    {
        SafeTaskDisposer daqReader = new SafeTaskDisposer();
        daqReader.StartAcquisition();
        Console.WriteLine("Press Enter key to terminate the task...");
        Console.Read();
        daqReader.Dispose();
    }
}
public class SafeTaskDisposer : IDisposable
{
    private ManualResetEvent runningTaskWaitHandle; 
    // Used for synchronization while disposing task 
    private Task runningTask;
    private Task continuousTask;
    private AnalogMultiChannelReader analogReader;
    private AsyncCallback analogCallback;
    private AnalogWaveform<double>[] data;

    public SafeTaskDisposer()
    {
        runningTaskWaitHandle = new ManualResetEvent(true); 
        // Instantiate wait handle
    }

    #region IDisposable Members

    // Dispose method executes on the main thread 
    public void Dispose()
    {
        SafelyDisposeTask();
    }

    #endregion 

    // StartAcquisition method executes on the main thread 
    public void StartAcquisition()
    {
        try
        {
            continuousTask = new Task();
            // Configure the task and create required channels in task
            continuousTask.AIChannels.CreateVoltageChannel("dev1/ai0:3", "", AITerminalConfiguration.Differential, -10.0, 10.0, AIVoltageUnits.Volts);
            continuousTask.Timing.ConfigureSampleClock("", 1000, SampleClockActiveEdge.Rising, SampleQuantityMode.ContinuousSamples, 1000);
            continuousTask.Control(TaskAction.Verify);
            analogReader = new AnalogMultiChannelReader(continuousTask.Stream);
            analogReader.SynchronizeCallbacks = false;
            analogCallback = new AsyncCallback(AnalogReadCallback);
            // Create a reference (runningTask) to the task running (continuousTask)
            runningTask = continuousTask;
            analogReader.BeginReadWaveform(Convert.ToInt32(continuousTask.Timing.SamplesPerChannel), analogCallback, continuousTask);
        }
        catch (DaqException ex)
        {
            // Exception thrown
            runningTask = null;
            continuousTask.Dispose();
        }
    }

    // StopAcquisition method executes on the main thread 
    public void StopAcquisition()
    {
        SafelyDisposeTask();
    }

    // AnalogReadCallback method executes on 
    //   - worker thread when SynchronizeCallback is set to false (current behavior) 
    //   - main thread when SynchronizeCallback is set to true 
    private void AnalogReadCallback(IAsyncResult ar)
    {
        try
        {
            // Check to see whether the task is still running 
            if (runningTask != null && runningTask == ar.AsyncState)
            {
                data = analogReader.EndReadWaveform(ar);

                // If ProcessData is updating UI components created on the main thread, then 
                // invoke ProcessData on the main thread when SynchronizeCallback is  
                // set to false.
                ProcessData();

                analogReader.BeginReadWaveform(Convert.ToInt32(continuousTask.Timing.SamplesPerChannel), analogCallback, continuousTask);
            }
            else
            {
                // Signal waiting thread to proceed
                SignalThread();

            }
        }
        catch (DaqException ex)
        {
            // Exception thrown
            runningTask = null;
            continuousTask.Dispose();
            // Signal waiting thread to proceed
            SignalThread();
        }
    }

    // ProcessData method executes on same thread on which AnalogReadCallback executes 
    private void ProcessData()
    {
        // If this method is updating UI components created on the main thread, then invoke 
        // the UI code or this entire method on the main thread when SynchronizeCallback 
        // is set to false.
    }

    // SafelyDisposeTask method executes on the main thread 
    private void SafelyDisposeTask()
    {
        if (runningTask != null)
        {
            if (!analogReader.SynchronizeCallbacks)
            {
                // When SynchronizeCallback is false, the callback will be executed on  
                // the worker thread. In that case, wait for the pending callback to be  
                // completed before disposing task. 

                // Reset wait handle to wait for callback, which is on the worker thread
                runningTaskWaitHandle.Reset();

                // Set runningTask to null to indicate the callback, not to initiate  
                // additional reads
                runningTask = null;

                // Wait for pending callback on the worker thread.  
                //   - If the last operation on runningTaskWaitHandle is Set (in SignalThread),  
                //     WaitOne will not wait and proceed to execute the next line of code. 
                //   - If the last operation on runningTaskWaitHandle is Reset (in  
                //     SafelyDisposeTask), WaitOne will make the main thread wait for the signal  
                //     (Set Operation on runningTaskWaitHandle)

                runningTaskWaitHandle.WaitOne();
            }
            else
            {
                // When SynchronizeCallback is true, this method and the callback will  
                // be executed on the main thread. In that case, there is no need to wait for pending  
                // callback to be completed before disposing task, because the callback  
                // and this method cannot execute at the same time. 

                // Set runningTask to null to indicate the callback, not to initiate  
                // additional reads
                runningTask = null;
            }
        }
        // Safe to dispose task 
        if (continuousTask != null)
        {
            continuousTask.Dispose();
        }
    }

    // SignalThread method executes on the worker thread 
    private void SignalThread()
    {
        // When SynchronizeCallback is false, the callback will be executed on the worker  
        // thread. In that case, while disposing task, the main thread is waiting on the callback 
        // which is executing on the worker thread to be completed. Signal (call Set  
        // operation on runningTaskWaitHandle) the main thread to proceed 
        if (!analogReader.SynchronizeCallbacks)
        {
            runningTaskWaitHandle.Set();
        }
    }
}

Special Threading Considerations

  • If you update any UI component from a thread other than the thread on which that component was created, you might get an InvalidOperationException. Refer to How to: Make Thread-Safe Calls to Windows Forms Controls for more details.
  • Calls to update any UI component should be asynchronous (non-blocking); if the calls are not asynchronous, you risk race conditions and deadlock in your application. These issues could occur because National Instruments recommends you use a synchronizing mechanism to dispose Task.
  • Using memory-optimized read methods in these situations could cause a loss of data integrity.