-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathSRSender.java
129 lines (107 loc) · 4.2 KB
/
SRSender.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import java.io.FileInputStream;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.util.Deque;
import java.util.ArrayDeque;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.Semaphore;
public class SRSender {
private static final int ACK_SIZE = 12;
private static final int BUFFER_SIZE = 500;
private static final int HEADER_SIZE = 12;
private static final int SEQNUM_MODULO = 256;
private final Semaphore available = new Semaphore(10);
private FileInputStream fileStream;
static DatagramSocket socket;
static InetAddress channelAddress;
static int port;
private Deque<TimerPacket> queue;
private Map<Integer, TimerPacket> map;
static int timeout;
private int base;
private int nextSeqNum;
private volatile boolean sendFinished;
SRSender(String file, String hostname, int channelPort, int t) throws Exception {
timeout = t;
base = 0;
nextSeqNum = 0;
port = channelPort;
channelAddress = InetAddress.getByName(hostname);
queue = new ArrayDeque<>();
map = new HashMap<>();
fileStream = new FileInputStream(file);
sendFinished = false;
}
private void receivePackets() {
byte[] buffer = new byte[ACK_SIZE];
DatagramPacket receiveDatagram = new DatagramPacket(buffer, buffer.length);
Packet packet;
while (!sendFinished || !queue.isEmpty()) {
try {
// get ack number
socket.receive(receiveDatagram);
packet = Packet.getPacket(receiveDatagram.getData());
System.out.println(String.format("PKT RECV ACK %s %s", packet.getLength(), packet.getSeqNum()));
int ackNum = packet.getSeqNum();
// mark that packet as having been received in the window
if (map.containsKey(ackNum)) {
TimerPacket timerPacket = map.get(ackNum);
timerPacket.stopTimer();
// move forward the window if ackNum == base
if (ackNum == base) {
while (!queue.isEmpty() && queue.peek().isAck()) {
timerPacket = queue.poll();
map.remove(timerPacket.getPacket().getSeqNum());
available.release();
}
base = (timerPacket.getPacket().getSeqNum() + 1) % SEQNUM_MODULO;
}
}
} catch (Exception e) {
System.out.println("Exception when receiving datagram packet");
}
}
}
public void start() throws Exception {
// create socket to send and receive data
socket = new DatagramSocket();
// create new thread to receive ACK packets
Thread receiveThread = new Thread(new Runnable() {
@Override
public void run() {
receivePackets();
}
});
receiveThread.start();
// send file data
System.out.println("Start to send file data");
while (true) {
// make packet with individual timer
byte[] buffer = new byte[BUFFER_SIZE];
int readNum = fileStream.read(buffer, 0, BUFFER_SIZE);
if (readNum < 0) {
sendFinished = true;
break;
}
Packet packet = new Packet(0, readNum + HEADER_SIZE, nextSeqNum, buffer);
TimerPacket timerPacket = new TimerPacket(packet);
// send the packet and start its timer
available.acquire();
queue.offer(timerPacket);
map.put(nextSeqNum, timerPacket);
timerPacket.startTimer();
Util.sendData(packet, channelAddress, port, socket);
// update nextSeqNum
nextSeqNum = (nextSeqNum + 1) % SEQNUM_MODULO;
}
// join the receive thread
receiveThread.join();
// end sender session
Util.endSenderSession(base, channelAddress, port, socket);
System.out.println("Finish sending file");
socket.close();
fileStream.close();
}
}