Skip to content

Commit

Permalink
Merge pull request #2256 from AllenInstitute/feature/2256-improve_asy…
Browse files Browse the repository at this point in the history
…nc_error_handling

ASYNC: Improved error status transfer from workers to main thread
  • Loading branch information
t-b authored Sep 9, 2024
2 parents 99f2c0e + 9a0fbf4 commit 939b261
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 78 deletions.
60 changes: 35 additions & 25 deletions Packages/MIES/MIES_Async.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ static StrConstant ASYNC_WORKLOADCLASS_STR = "workloadClass"
static StrConstant ASYNC_PARAMCOUNT_STR = "paramCount"
static StrConstant ASYNC_INORDER_STR = "inOrder"
static StrConstant ASYNC_ABORTFLAG_STR = "abortFlag"
static StrConstant ASYNC_ERROR_STR = "err"
static StrConstant ASYNC_ERRORMSG_STR = "errmsg"
static StrConstant ASYNC_RTERROR_STR = "rtErr"
static StrConstant ASYNC_RTERRORMSG_STR = "rtErrMessage"
static StrConstant ASYNC_ABORTCODE_STR = "abortCode"
static StrConstant ASYNC_WLCOUNTER_STR = "workloadClassCounter"
///@}

Expand Down Expand Up @@ -86,13 +87,8 @@ End

/// @brief Prototype function for an async readout function
///
/// @param dfr reference to returned data folder from thread
/// @param err error code, only set in the error case
/// @param errmsg error message, only set in the error case
Function ASYNC_ReadOut(dfr, err, errmsg)
DFREF dfr
variable err
string errmsg
/// @param ar ASYNC readout structure
Function ASYNC_ReadOut(STRUCT ASYNC_ReadOutStruct &ar)
End

/// @brief thread function that receives data folders from the thread input queue
Expand Down Expand Up @@ -134,19 +130,26 @@ threadsafe static Function/DF ASYNC_Run_Worker(DFREF dfr)
DFREF dfrInp = dfr:input
DFREF dfrAsync = dfr:async

variable/G dfrAsync:$ASYNC_ERROR_STR = 0
NVAR err = dfrAsync:$ASYNC_ERROR_STR
string/G dfrAsync:$ASYNC_ERRORMSG_STR = ""
SVAR errmsg = dfrAsync:$ASYNC_ERRORMSG_STR
variable/G dfrAsync:$ASYNC_RTERROR_STR = 0
variable/G dfrAsync:$ASYNC_ABORTCODE_STR = 0
string/G dfrAsync:$ASYNC_RTERRORMSG_STR = ""
NVAR rtErr = dfrAsync:$ASYNC_RTERROR_STR
NVAR abortCode = dfrAsync:$ASYNC_ABORTCODE_STR
SVAR rtErrMsg = dfrAsync:$ASYNC_RTERRORMSG_STR

err = 0
dfrOut = $""
AssertOnAndClearRTError()
try
dfrOut = f(dfrInp); AbortOnRTE
catch
errmsg = GetRTErrMessage()
err = ClearRTError()
rtErrMsg = GetRTErrMessage()
rtErr = ClearRTError()
if(!rtErr)
rtErrMsg = ""
endif
if(V_AbortCode != ABORTCODE_ABORTONRTE)
abortCode = V_AbortCode
endif
endtry

if(IsFreeDatafolder(dfrOut))
Expand All @@ -168,7 +171,8 @@ Function ASYNC_ThreadReadOut()
variable justBuffered

variable wlcIndex, statCnt, index
string rterrmsg
string msg
STRUCT ASYNC_ReadOutStruct ar
NVAR tgID = $GetThreadGroupID()
ASSERT(!isNaN(tgID), "Async frame work is not running")
WAVE/DF DFREFbuffer = GetDFREFBuffer(getAsyncHomeDF())
Expand Down Expand Up @@ -238,19 +242,25 @@ Function ASYNC_ThreadReadOut()

track[%$workloadClass][%OUTPUTCOUNT] += 1

SVAR RFunc = dfr:$ASYNC_READOUTFUNC_STR
FUNCREF ASYNC_ReadOut f = $RFunc
NVAR err = dfr:$ASYNC_ERROR_STR
SVAR errmsg = dfr:$ASYNC_ERRORMSG_STR
SVAR RFunc = dfr:$ASYNC_READOUTFUNC_STR
FUNCREF ASYNC_ReadOut f = $RFunc
NVAR rtErr = dfr:$ASYNC_RTERROR_STR
SVAR rtErrMsg = dfr:$ASYNC_RTERRORMSG_STR
NVAR abortCode = dfr:$ASYNC_ABORTCODE_STR

ar.dfr = dfrOut
ar.rtErr = rtErr
ar.rtErrMsg = rtErrMsg
ar.abortCode = abortCode

statCnt += 1
AssertOnAndClearRTError()
try
f(dfrOut, err, errmsg); AbortOnRTE
f(ar); AbortOnRTE
catch
rterrmsg = GetRTErrMessage()
ClearRTError()
ASSERT(0, "ReadOut function " + RFunc + " aborted with: " + rterrmsg)
msg = GetRTErrMessage()
ASSERT(!ClearRTError(), "ReadOut function " + RFunc + " encountered an RTE: " + msg)
ASSERT(!V_AbortCode, "ReadOut function " + RFunc + " aborted with code: " + GetErrMessage(V_AbortCode))
endtry

endfor
Expand Down
10 changes: 10 additions & 0 deletions Packages/MIES/MIES_Constants.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -2307,3 +2307,13 @@ Constant SUTTER_MAX_MAX_TP_PULSES = 10000
Constant INVALID_SWEEP_NUMBER = -1

StrConstant PERCENT_F_MAX_PREC = "%.15f"

/// @name Igor Internal Abort Codes
///
/// @anchor IgorAbortCodes
///@{
Constant ABORTCODE_ABORTONRTE = -4
Constant ABORTCODE_ABORT = -3
Constant ABORTCODE_STACKOVERFLOW = -2
Constant ABORTCODE_USERABORT = -1
///@}
11 changes: 6 additions & 5 deletions Packages/MIES/MIES_NeuroDataWithoutBorders.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -366,13 +366,14 @@ threadsafe Function/DF NWB_ASYNC_Worker(DFREF dfr)
return $""
End

Function NWB_ASYNC_Readout(DFREF dfr, variable err, string errmsg)
Function NWB_ASYNC_Readout(STRUCT ASYNC_ReadOutStruct &ar)

if(!err)
return NaN
if(ar.rtErr)
BUG("Async jobs finished with RTE: " + ar.rtErrMsg)
endif
if(ar.abortCode)
BUG("Async jobs finished with Abort: " + GetErrMessage(ar.abortCode))
endif

BUG("Async jobs finished with: " + errmsg)
End

threadsafe static Function NWB_WriteLabnoteBooksAndComments(STRUCT NWBAsyncParameters &s)
Expand Down
8 changes: 8 additions & 0 deletions Packages/MIES/MIES_Structures.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -637,3 +637,11 @@ Structure SF_PlotMetaData
string xAxisLabel // from SF_META_XAXISLABEL constant
string yAxisLabel // from SF_META_YAXISLABEL constant
EndStructure

/// @brief ReadOut Structure for ASYNC
Structure ASYNC_ReadOutStruct
DFREF dfr // dfr with output data
variable rtErr // runtime error code
string rtErrMsg // runtime error message
variable abortCode // abort code
EndStructure
19 changes: 7 additions & 12 deletions Packages/MIES/MIES_TestPulse.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -179,26 +179,21 @@ End
/// results are base line level, steady state resistance, instantaneous resistance and their positions
/// collected results for all channels of a measurement are send to TP_RecordTP(), DQ_ApplyAutoBias() when complete
///
/// @param dfr output data folder from ASYNC frame work with results from workloads associated with this registered function
/// The output parameter in the data folder follow the definition as created in TP_TSAnalysis()
///
/// @param err error code of TP_TSAnalysis() function
///
/// @param errmsg error message of TP_TSAnalysis() function
Function TP_ROAnalysis(dfr, err, errmsg)
DFREF dfr
variable err
string errmsg
/// @param ar ASYNC_ReadOutStruct structure with dfr with input data
Function TP_ROAnalysis(STRUCT ASYNC_ReadOutStruct &ar)

variable i, j, bufSize
variable posMarker, posAsync, tpBufferSize
variable posBaseline, posSSRes, posInstRes
variable posElevSS, posElevInst

if(err)
ASSERT(0, "RTError " + num2str(err) + " in TP_Analysis thread: " + errmsg)
if(ar.rtErr || ar.abortCode)
ASSERT(!ar.rtErr, "TP analysis thread encountered RTE " + ar.rtErrMsg)
ASSERT(!ar.abortCode, "TP analysis thread aborted with code: " + GetErrMessage(ar.abortCode))
endif

DFREF dfr = ar.dfr

WAVE/SDFR=dfr inData = outData
NVAR/SDFR=dfr now = now
NVAR/SDFR=dfr hsIndex = hsIndex
Expand Down
Loading

0 comments on commit 939b261

Please sign in to comment.