本文为斯坦福大学计算机网络课程 CS144 编程任务 Lab Assignment 3 的学习小结
官网 https://cs144.github.io/
Lab 3 文档 https://cs144.github.io/assignments/lab3.pdf
个人实验备份代码 https://github.com/deepzheng/sponge
The TCP Sender
我们所实现的 TCPSender 需要具备如下功能:
- 跟踪接收端的窗口大小,并根据接收端的窗口尺寸调整发送速率(流量控制)
- 尽可能填充发送窗口并发送报文段直到窗口已满或发送数据完成
- 跟踪已发送但尚未被接收的报文段,并在合适的时机重传(超时重传)
本次任务我们主要需要完成以下几个函数
fill_window()
将输入的 ByteStream 尽可能填充到发送窗口中,
ack_received( const WrappingInt32 ackno, const uint16_t window_size)
从接收者发送来的确认报文中,获取确认序列号和接收窗口大小便于 Sender 删除驻留在队列中已经发送但尚未被确认的报文,并调整发送窗口的大小
tick( const size_t ms_since_last_tick )
唯一的调用接口来检查已发送但尚未被接收的报文是否发生超时,若超时将执行重传
send_empty_segment()
发送一段字节长度为 0 的报文段,这样的空报文段不需要被标记为未确认段(outstanding)并且不需要启动计时器也不会被重传
以下是实现思路以及一些小细节
用队列存储发送报文段,同样用一个队列来存储发送后等待被确认的报文段(空报文段不需要被推入该队列)
在连接建立时,TCPSender 会发送一个只含有 SYN 信号的报文段,且该报文段只用于握手,而 FIN 信号可以包含在含有数据的报文段中。当接收端接收到 FIN 信号且成功返回对于 FIN 的确认后,连接才正式关闭。因此,当 SYN 尚未被确认且窗口非 0 时,此时仍然不能发送数据
TCP 实行的是累计确认,即 TCP 只确认该流中至第一个丢失字节为止的字节。 在接收到确认序号 ackno 后,意味着在该序号前的所有字节已经被确认,就可以把这些字节在等待队列中删除。同时,如果发生超时重传,只需要重传等待队列队头的报文即可
对于发送方的 windows size ,还有一个问题需要注意。就是当接收端缓存已满(即窗口大小为 0 )时,将这一信息返回给发送方,并且刚好发送方也没有任何数据要发送。这时候就会出现一个很尴尬的情况,就是发送方接收不到任何来自接收端窗口大小变化的通知(因为 TCP 仅当有数据或者确认要发时才会发送报文段,而窗口大小的信息是附在这些报文段中的)。所以此时发送端相当于被阻塞了。对于这种情况,TCP 规定将窗口大小视为 1 ,在接收端窗口为 0 时继续发送大小为 1 的报文段。这些报文段最终将被接收并返回新的窗口大小。所以在代码实现中加入了 _win_zero_flag
变量
if (_win_size == 0) {
_win_zero_flag = true;
_win_size = 1;
} else {
_win_zero_flag = false;
}
当反馈回来的窗口大小为 0 时,强制调整为 1 ,并将 _win_zero_flag
设为 true 用于辅助判断在超时重传时是否需要将重传时间倍增
总的实现代码如下
tcp_sender.hh
class TCPSender {
private:
//! our initial sequence number, the number for our SYN.
WrappingInt32 _isn;
//! outbound queue of segments that the TCPSender wants sent
std::queue<TCPSegment> _segments_out{};
std::queue<TCPSegment> _segments_waiting{};
//! retransmission timer for the connection
unsigned int _initial_retransmission_timeout;
ByteStream _stream;
uint64_t _next_seqno{0}; // 下一个将被发送的字节序号
uint64_t _ack_seqno{0}; // 最大的被确认的字节序号
uint64_t _win_size{1};
size_t _consecutive_retransmission{0};
unsigned int _total_time{0};
unsigned int _retransmisson_timeout{0};
bool _is_timer_running{false};
bool _is_syn{false};
bool _is_fin{false};
bool _win_zero_flag{false};
public:
//! Initialize a TCPSender
TCPSender(const size_t capacity = TCPConfig::DEFAULT_CAPACITY,
const uint16_t retx_timeout = TCPConfig::TIMEOUT_DFLT,
const std::optional<WrappingInt32> fixed_isn = {});
ByteStream &stream_in() { return _stream; }
const ByteStream &stream_in() const { return _stream; }
std::queue<TCPSegment> &segments_waiting() { return _segments_waiting; }
const std::queue<TCPSegment> &segments_waiting() const { return _segments_waiting; }
void ack_received(const WrappingInt32 ackno, const uint16_t window_size);
void send_empty_segment();
void fill_window();
void tick(const size_t ms_since_last_tick);
size_t bytes_in_flight() const;
unsigned int consecutive_retransmissions() const;
std::queue<TCPSegment> &segments_out() { return _segments_out; }
uint64_t next_seqno_absolute() const { return _next_seqno; }
WrappingInt32 next_seqno() const { return wrap(_next_seqno, _isn); }
};
tcp_sender.cc
TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn)
: _isn(fixed_isn.value_or(WrappingInt32{random_device()()}))
, _initial_retransmission_timeout{retx_timeout}
, _stream(capacity)
, _retranmisson_timeout(retx_timeout) {}
uint64_t TCPSender::bytes_in_flight() const { return _next_seqno - _ack_seqno; }
void TCPSender::fill_window() {
if (_win_size == 0 || _is_fin) {
return;
}
TCPSegment seg;
if (!_is_syn) {
// only send a syn flag in order to handshake
_is_syn = true;
seg.header().syn = true;
seg.header().seqno = next_seqno();
_next_seqno++;
_win_size--;
segments_out().push(seg);
segments_waiting().push(seg);
} else if (stream_in().eof()) {
// only send a fin flag
_is_fin = true;
seg.header().fin = true;
seg.header().seqno = next_seqno();
_next_seqno++;
_win_size--;
segments_out().push(seg);
segments_waiting().push(seg);
} else {
// send the normal payload
while (_win_size > 0 && !stream_in().buffer_empty()) {
seg.header().seqno = next_seqno();
uint64_t send_len = min(_win_size, min(TCPConfig::MAX_PAYLOAD_SIZE, stream_in().buffer_size()));
seg.payload() = stream_in().read(send_len);
if (seg.length_in_sequence_space() < _win_size && stream_in().eof()) {
_is_fin = true;
seg.header().fin = true;
}
_next_seqno += seg.length_in_sequence_space();
_win_size -= seg.length_in_sequence_space();
segments_out().push(seg);
segments_waiting().push(seg);
}
}
if (!_is_timer_running) {
// start timer
_is_timer_running = true;
_total_time = 0;
}
}
//! \param ackno The remote receiver's ackno (acknowledgment number)
//! \param window_size The remote receiver's advertised window size
void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
uint64_t ack_abs = unwrap(ackno, _isn, _next_seqno);
_win_size = window_size;
if (ack_abs > _next_seqno || ack_abs <= _ack_seqno)
return;
_retranmisson_timeout = _initial_retransmission_timeout;
_consecutive_retransmission = 0;
while (!segments_waiting().empty()) {
TCPSegment seg = segments_waiting().front();
uint64_t seqno = unwrap(seg.header().seqno, _isn, _next_seqno) + seg.length_in_sequence_space();
if (seqno <= ack_abs) {
_ack_seqno = seqno;
segments_waiting().pop();
} else
break;
}
if (!segments_waiting().empty()) {
// restart timer
_total_time = 0;
_is_timer_running = true;
}
//如果没有发送但未确认的报文,关闭计时器
if (bytes_in_flight() == 0) {
_is_timer_running = false;
}
if (bytes_in_flight() > window_size) {
//当发送中的数据大于窗口尺寸时,暂停发送新数据
_win_size = 0;
_win_zero_flag = true;
return;
}
if (_win_size == 0) {
_win_zero_flag = true;
_win_size = 1;
} else {
_win_zero_flag = false;
}
fill_window();
}
//! \param[in] ms_since_last_tick the number of milliseconds since the last call to this method
void TCPSender::tick(const size_t ms_since_last_tick) {
_total_time += ms_since_last_tick;
if (_total_time >= _retransmisson_timeout && !segments_waiting().empty()) {
// only retransmit the head of queue
segments_out().push(segments_waiting().front());
if (!_win_zero_flag) {
_consecutive_retransmission++;
_retranmisson_timeout *= 2;
}
_total_time = 0;
}
if (segments_waiting().empty()) {
_is_timer_running = false;
}
}
unsigned int TCPSender::consecutive_retransmissions() const { return _consecutive_retransmission; }
void TCPSender::send_empty_segment() {
TCPSegment seg;
seg.header().seqno = next_seqno();
segments_out().push(seg);
}
小结
这次的 Lab 明显比前几次的难度提升了一个档次。完全读通文档的要求就挺费精力的了,等到具体的代码实现,又开始不知所措了。勉勉强强写完最后又是一堆错误,连写带调(中间也参考了一下别人的思路)花掉了快两天时间,终于过了测试,虽然感觉自己的代码非常的丑陋qwq。不得不说,随着实验的深入,要考虑的细节越来越多了,虽然只是个简易的 TCP,但是对于各种边边角角的情况都需要考虑到,这几天基本就指着测试来帮我找坑了。。
终于只剩下最后一个 Lab 了,虽然 Lab4 的变态程度早已听闻。。不过不要怕,胜利就在眼前,干就完事了!