Skip to content

Commit

Permalink
Optimzation for detecting connection failures (see issue dallmann-con…
Browse files Browse the repository at this point in the history
  • Loading branch information
dallmann-consulting committed Nov 7, 2024
1 parent e37ab01 commit 6a62e2c
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 125 deletions.
129 changes: 67 additions & 62 deletions OCPP.Core.Server/OCPPMiddleware.OCPP16.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,98 +50,103 @@ private async Task Receive16(ChargePointStatus chargePointStatus, HttpContext co
byte[] buffer = new byte[1024 * 4];
MemoryStream memStream = new MemoryStream(buffer.Length);

while (chargePointStatus.WebSocket.State == WebSocketState.Open)
try
{
WebSocketReceiveResult result = await chargePointStatus.WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
if (result != null && result.MessageType != WebSocketMessageType.Close)
while (chargePointStatus.WebSocket.State == WebSocketState.Open)
{
logger.LogTrace("OCPPMiddleware.Receive16 => Receiving segment: {0} bytes (EndOfMessage={1} / MsgType={2})", result.Count, result.EndOfMessage, result.MessageType);
memStream.Write(buffer, 0, result.Count);

// max. allowed message size NOT exceeded - or limit deactivated?
if (maxMessageSizeBytes == 0 || memStream.Length <= maxMessageSizeBytes)
WebSocketReceiveResult result = await chargePointStatus.WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
if (result != null && result.MessageType != WebSocketMessageType.Close)
{
if (result.EndOfMessage)
{
// read complete message into byte array
byte[] bMessage = memStream.ToArray();
// reset memory stream für next message
memStream = new MemoryStream(buffer.Length);

string ocppMessage = UTF8Encoding.UTF8.GetString(bMessage);
logger.LogTrace("OCPPMiddleware.Receive16 => Receiving segment: {0} bytes (EndOfMessage={1} / MsgType={2})", result.Count, result.EndOfMessage, result.MessageType);
memStream.Write(buffer, 0, result.Count);

// write message (async) to dump directory
_ = Task.Run(() =>
{
DumpMessage("ocpp16-in", ocppMessage);
});

Match match = Regex.Match(ocppMessage, MessageRegExp);
if (match != null && match.Groups != null && match.Groups.Count >= 3)
// max. allowed message size NOT exceeded - or limit deactivated?
if (maxMessageSizeBytes == 0 || memStream.Length <= maxMessageSizeBytes)
{
if (result.EndOfMessage)
{
string messageTypeId = match.Groups[1].Value;
string uniqueId = match.Groups[2].Value;
string action = match.Groups[3].Value;
string jsonPaylod = match.Groups[4].Value;
logger.LogInformation("OCPPMiddleware.Receive16 => OCPP-Message: Type={0} / ID={1} / Action={2})", messageTypeId, uniqueId, action);
// read complete message into byte array
byte[] bMessage = memStream.ToArray();
// reset memory stream für next message
memStream = new MemoryStream(buffer.Length);

OCPPMessage msgIn = new OCPPMessage(messageTypeId, uniqueId, action, jsonPaylod);
string ocppMessage = UTF8Encoding.UTF8.GetString(bMessage);

// Send raw incoming messages to extensions
// write message (async) to dump directory
_ = Task.Run(() =>
{
ProcessRawIncomingMessageSinks(chargePointStatus.Protocol, chargePointStatus.Id, msgIn);
DumpMessage("ocpp16-in", ocppMessage);
});

if (msgIn.MessageType == "2")
Match match = Regex.Match(ocppMessage, MessageRegExp);
if (match != null && match.Groups != null && match.Groups.Count >= 3)
{
// Request from chargepoint to OCPP server
OCPPMessage msgOut = controller16.ProcessRequest(msgIn);
string messageTypeId = match.Groups[1].Value;
string uniqueId = match.Groups[2].Value;
string action = match.Groups[3].Value;
string jsonPaylod = match.Groups[4].Value;
logger.LogInformation("OCPPMiddleware.Receive16 => OCPP-Message: Type={0} / ID={1} / Action={2})", messageTypeId, uniqueId, action);

// Send OCPP message with optional logging/dump
await SendOcpp16Message(msgOut, logger, chargePointStatus);
}
else if (msgIn.MessageType == "3" || msgIn.MessageType == "4")
{
// Process answer from chargepoint
if (_requestQueue.ContainsKey(msgIn.UniqueId))
OCPPMessage msgIn = new OCPPMessage(messageTypeId, uniqueId, action, jsonPaylod);

// Send raw incoming messages to extensions
_ = Task.Run(() =>
{
controller16.ProcessAnswer(msgIn, _requestQueue[msgIn.UniqueId]);
_requestQueue.Remove(msgIn.UniqueId);
ProcessRawIncomingMessageSinks(chargePointStatus.Protocol, chargePointStatus.Id, msgIn);
});

if (msgIn.MessageType == "2")
{
// Request from chargepoint to OCPP server
OCPPMessage msgOut = controller16.ProcessRequest(msgIn);

// Send OCPP message with optional logging/dump
await SendOcpp16Message(msgOut, logger, chargePointStatus);
}
else if (msgIn.MessageType == "3" || msgIn.MessageType == "4")
{
// Process answer from chargepoint
if (_requestQueue.ContainsKey(msgIn.UniqueId))
{
controller16.ProcessAnswer(msgIn, _requestQueue[msgIn.UniqueId]);
_requestQueue.Remove(msgIn.UniqueId);
}
else
{
logger.LogError("OCPPMiddleware.Receive16 => HttpContext from caller not found / Msg: {0}", ocppMessage);
}
}
else
{
logger.LogError("OCPPMiddleware.Receive16 => HttpContext from caller not found / Msg: {0}", ocppMessage);
// Unknown message type
logger.LogError("OCPPMiddleware.Receive16 => Unknown message type: {0} / Msg: {1}", msgIn.MessageType, ocppMessage);
}
}
else
{
// Unknown message type
logger.LogError("OCPPMiddleware.Receive16 => Unknown message type: {0} / Msg: {1}", msgIn.MessageType, ocppMessage);
logger.LogWarning("OCPPMiddleware.Receive16 => Error in RegEx-Matching: Msg={0})", ocppMessage);
}
}
else
{
logger.LogWarning("OCPPMiddleware.Receive16 => Error in RegEx-Matching: Msg={0})", ocppMessage);
}
}
else
{
// max. allowed message size exceeded => close connection (DoS attack?)
logger.LogInformation("OCPPMiddleware.Receive16 => Allowed message size exceeded - close connection");
await chargePointStatus.WebSocket.CloseOutputAsync(WebSocketCloseStatus.MessageTooBig, string.Empty, CancellationToken.None);
}
}
else
{
// max. allowed message size exceeded => close connection (DoS attack?)
logger.LogInformation("OCPPMiddleware.Receive16 => Allowed message size exceeded - close connection");
await chargePointStatus.WebSocket.CloseOutputAsync(WebSocketCloseStatus.MessageTooBig, string.Empty, CancellationToken.None);
logger.LogInformation("OCPPMiddleware.Receive16 => WebSocket Closed: CloseStatus={0} / MessageType={1}", result?.CloseStatus, result?.MessageType);
await chargePointStatus.WebSocket.CloseOutputAsync((WebSocketCloseStatus)3001, string.Empty, CancellationToken.None);
}
}
else
{
logger.LogInformation("OCPPMiddleware.Receive16 => WebSocket Closed: CloseStatus={0} / MessageType={1}", result?.CloseStatus, result?.MessageType);
await chargePointStatus.WebSocket.CloseOutputAsync((WebSocketCloseStatus)3001, string.Empty, CancellationToken.None);
}
}
logger.LogInformation("OCPPMiddleware.Receive16 => Websocket closed: State={0} / CloseStatus={1}", chargePointStatus.WebSocket.State, chargePointStatus.WebSocket.CloseStatus);
ChargePointStatus dummy;
_chargePointStatusDict.Remove(chargePointStatus.Id, out dummy);
finally
{
logger.LogInformation("OCPPMiddleware.Receive16 => Websocket closed: State={0} / CloseStatus={1}", chargePointStatus.WebSocket.State, chargePointStatus.WebSocket.CloseStatus);
_chargePointStatusDict.Remove(chargePointStatus.Id);
}
}

/// <summary>
Expand Down
Loading

0 comments on commit 6a62e2c

Please sign in to comment.