Skip to content

Commit

Permalink
arrays to lists
Browse files Browse the repository at this point in the history
  • Loading branch information
javicond3 committed Jan 4, 2021
1 parent be7e7cd commit 5e80fff
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 76 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package org.apache.nifi.processors.tcp.client;


import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -21,58 +22,67 @@ public class MessageHandler {
this.dle = dle;
}

// extract packet from byteArrayInput
public byte[] popFromByteArray(){
int EOP = -1;
for (int i = 0; i < this.byteArrayInput.length -1; i++) {
if(this.byteArrayInput[i] == this.dle) {
if(this.byteArrayInput[i+1] == this.dle) {
i++;
} else if(this.byteArrayInput[i+1] == this.etx) {
// limited found, end of packet
EOP = i;
break;
}
}
}
if (EOP != -1) {
// entire packet without EOP
byte[] packet = Arrays.copyOfRange(this.byteArrayInput, 0, EOP);

//pop the content of packet form byteArrayInput
if(EOP+2 < this.byteArrayInput.length) {
this.byteArrayInput = Arrays.copyOfRange(this.byteArrayInput, EOP+2, this.byteArrayInput.length);
} else {
this.byteArrayInput = new byte[0];
}
if(packet[0] != this.dle && packet[0] != this.stx) {
logger.warn("Popped a packet without a valid start delimiter");
return null;
}
// Delete DLE STX
packet = Arrays.copyOfRange(packet, 2, packet.length);
packet = this.deStuff(packet);
return packet;

}

return null;
public List<Byte> popPacketFromBuffer(List<Byte> buffer){
int bufferSize = buffer.size();

// Return on small packet length
if (bufferSize < 4) return null;

if (buffer.get(bufferSize - 2) == dle && buffer.get(bufferSize - 1) == etx){

// Check to see if we have an odd number of dle packets
// An Even number will signify a dle stuffed mid packet!
int dleCount = 0;
for (int i = bufferSize - 2; i >= 0; i--){
if (buffer.get(i) == dle){
dleCount += 1;
}else{
break;
}
}

if (dleCount % 2 == 0){
// dleCount is an even number so this is not the end of a packet
return null;
}


// Ensure packet begins with a valid startDelimiter
if (buffer.get(0) != dle && buffer.get(1) != stx){
System.out.println("Popped a packet without a valid start delimiter");
return null;
}

return deStuffPacket(buffer);
}

return null;
}

//delete duplicate delimiters from the packet:
//dle dle to dle
public byte[] deStuff(byte[] byteArray) {
byte[] byteArrayOut = new byte[byteArray.length];
int o = 0;
for (int i = 0; i < byteArray.length; i++, o++) {
if(byteArray[i] == this.dle && i < byteArray.length -1 && byteArray[i+1] == this.dle) {
byteArrayOut[o] = byteArray[i];
i++;
}else {
byteArrayOut[o] = byteArray[i];
}
}
return Arrays.copyOfRange(byteArrayOut, 0, o);



public List<Byte> deStuffPacket(List<Byte> stuffedPacket){
int lengthOfBuffer = stuffedPacket.size();

List <Byte> buffer = new ArrayList<Byte>();
for (int i = 0; i < lengthOfBuffer - 1; i++){
if (stuffedPacket.get(i) == dle && stuffedPacket.get(i + 1) == dle){
// Write a single dle byte!
buffer.add((byte) dle);
i++;
} else if (stuffedPacket.get(i) == dle && stuffedPacket.get(i + 1) == stx){
// Skip the header from the framing
i++;
} else if (stuffedPacket.get(i) == dle && stuffedPacket.get(i + 1) == etx){
// Skip the footer from the framing
i++;
} else {
// Write the byte as it's ok!
buffer.add(stuffedPacket.get(i));
}
}

return buffer;
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;

import org.apache.commons.lang3.ArrayUtils;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
Expand All @@ -29,19 +29,21 @@ public class SendingReceivingClient {
private final int port;
private final String customText;
private final int readSecondsTimeout;
private final int receiveBufferSize;
private final Relationship REL_SUCCESS;
private final ProcessSessionFactory sessionFactory;
private final SSLContextService sslContextService;
private final MessageHandler delegatingMessageHandler;
private volatile SSLSocket sslSocket;
private Thread threadClient;

public SendingReceivingClient(String host, int port, String customText, int readSecondsTimeout,
public SendingReceivingClient(String host, int port, String customText, int readSecondsTimeout, int receiveBufferSize,
Relationship REL_SUCCESS, ProcessSessionFactory sessionFactory, SSLContextService sslContextService, MessageHandler delegatingMessageHandler) {
this.host = host;
this.port = port;
this.customText = customText;
this.readSecondsTimeout = readSecondsTimeout;
this.receiveBufferSize = receiveBufferSize;
this.REL_SUCCESS = REL_SUCCESS;
this.sessionFactory = sessionFactory;
this.sslContextService = sslContextService;
Expand Down Expand Up @@ -84,29 +86,40 @@ private void process() {
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(outputStream);

bufferedOutputStream.write((this.customText.toString()+"\r\n").getBytes());
bufferedOutputStream.flush();
while (true && !this.sslSocket.isClosed()) {
byte[] buffer = new byte[40];

try {
int bytesRead = bufferedInputStream.read(buffer);
buffer = Arrays.copyOfRange(buffer, 0, bytesRead);
delegatingMessageHandler.byteArrayInput = ArrayUtils.addAll(delegatingMessageHandler.byteArrayInput, buffer);
if(bytesRead < 0) {
this.stop();
return;
}
byte[] packet = null;
while((packet = this.delegatingMessageHandler.popFromByteArray()) != null) {
this.handle(packet);

}
} catch (SocketTimeoutException e) {
bufferedOutputStream.flush();

List<Byte> buffer = new ArrayList<Byte>();
while (!this.sslSocket.isClosed()) {
try {
int lastByte = bufferedInputStream.read();
buffer.add((byte)lastByte);
if (lastByte < 0) {
sslSocket.close();
return;
}

if (buffer.size() >= receiveBufferSize) {
buffer.clear();
}
List<Byte>packet = this.delegatingMessageHandler.popPacketFromBuffer(buffer);
if (packet != null){
Byte[] packetArray = new Byte[packet.size()];
packet.toArray(packetArray);
byte[] packetArrayByte = new byte[packet.size()];
for(int i = 0; i<packet.size(); i++) {
packetArrayByte[i] = packetArray[i].byteValue();
}
this.handle(packetArrayByte);
buffer.clear();
}

} catch (SocketTimeoutException e) {
logger.error("Socket timed out", e);
this.stop();
return;
}
}

}
} catch (IOException e) {
logger.error("Socket process error", e);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,9 @@ public void onScheduled(final ProcessContext context) {

@Override
public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {

if (this.client == null) {
this.client = new SendingReceivingClient(
this.host, this.port, this.customText, this.readSecondsTimeout, REL_SUCCESS,
this.host, this.port, this.customText, this.readSecondsTimeout, this.receiveBufferSize, REL_SUCCESS,
sessionFactory, this.sslContextService,
new MessageHandler(this.stx, this.etx, this.dle)
);
Expand Down

0 comments on commit 5e80fff

Please sign in to comment.