-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathSRReceiver.java
110 lines (90 loc) · 3.63 KB
/
SRReceiver.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import java.io.FileOutputStream;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.Map;
public class SRReceiver {
private static final int BUFFER_SIZE = 512;
private static final int SEQNUM_MODULO = 256;
private static final int WINDOW_SIZE = 10;
private int base;
private Map<Integer, Packet> map;
private DatagramSocket socket;
private FileOutputStream fout;
private InetAddress channelAddress;
private int channelPort;
private boolean getChannelInfo;
SRReceiver(DatagramSocket socket, String file) throws Exception {
this.socket = socket;
fout = new FileOutputStream(file);
base = 0;
getChannelInfo = false;
map = new HashMap<>();
}
// check if ackNum falls in the receiver's window
private boolean withinWindow(int ackNum) {
int distance = ackNum - base;
if (ackNum < base) {
distance += SEQNUM_MODULO;
}
return distance < WINDOW_SIZE;
}
// check if ackNum falls in receiver's previous window
private boolean withinPrevWindow(int ackNum) {
int distance = base - ackNum;
if (base < ackNum) {
distance += SEQNUM_MODULO;
}
return distance <= WINDOW_SIZE && distance > 0;
}
public void start() throws Exception {
byte[] buffer = new byte[BUFFER_SIZE];
DatagramPacket receiveDatagram = new DatagramPacket(buffer, buffer.length);
System.out.println("Start to receive data");
while(true) {
// receive packet
socket.receive(receiveDatagram);
Packet packet = Packet.getPacket(receiveDatagram.getData());
// get channel info
if (!getChannelInfo) {
channelAddress = receiveDatagram.getAddress();
channelPort = receiveDatagram.getPort();
getChannelInfo = true;
}
if (packet.getType() == 2) {
// end receiver session when receiving EOT
Util.endReceiverSession(packet, channelAddress, channelPort, socket);
break;
} else if (packet.getType() == 0){
// process data packet
System.out.println(String.format("PKT RECV DAT %s %s", packet.getLength(), packet.getSeqNum()));
int ackNum = packet.getSeqNum();
if (withinWindow(ackNum)) {
// send ACK back to sender
Util.sendACK(ackNum, channelAddress, channelPort, socket);
// if the packet is not previously received, it is buffered
if (!map.containsKey(ackNum)) {
map.put(ackNum, packet);
}
// if ackNum == base, move forward the window
if (ackNum == base) {
while (map.containsKey(ackNum)) {
fout.write(map.get(ackNum).getData());
map.remove(ackNum);
ackNum = (ackNum + 1) % SEQNUM_MODULO;
}
base = ackNum % SEQNUM_MODULO;
}
} else if (withinPrevWindow(ackNum)) {
// if the packet falls in receiver's previous window, send back ACK
Util.sendACK(ackNum, channelAddress, channelPort, socket);
}
}
}
// close socket and file outputstream
System.out.println("Finish receiving file");
fout.close();
socket.close();
}
}