本文为斯坦福大学计算机网络课程 CS144 编程任务 Lab Assignment 4 的学习小结
官网 https://cs144.github.io/
Lab 4 文档 https://cs144.github.io/assignments/lab4.pdf
个人实验备份代码 https://github.com/deepzheng/sponge
TCPConnection 需要完成以下三大功能
-
接收报文
-
如果接收到的报文段设置了重置标志
RST
,将入栈和出栈的流都设成错误状态并 kill 掉连接 - 将报文段提供给 TCPReceiver 做进一步的处理
- 如果接收到的报文段设置了确认标志
ACK
,将ackno
和window_size
字段传送给 TCPSender - 如果传入的报文段占用了序列号,TCPConnection 应该确保至少发送一个用于回复的报文段以确保对方的
ackno
和window_size
能得到更新
-
-
发送报文
- 任何时候 TCPSender 都将报文段推向其传出队列,设置字段:(
seqno
,SYN
,payload
和FIN
) - 在发送报文段之前,TCPConnection 将询问 Tcreceiver 为其负责传出段的字段:ackno 和 window_size。如果有一个ackno,它将设置 ACK 标志和TCPSegment 中的字段
- 任何时候 TCPSender 都将报文段推向其传出队列,设置字段:(
-
记录传输时间
- 调用
tick
方法告诉 TCPSender 自从上次调用后经过的时间 - 若连续重传数量超过上限,终止连接,发送一个重置段(带有
RST
标志的空段) - 关闭连接若必要的话
- 调用
上图就是 TCP 报文段的所有内容了,蓝色部分代表发送方需要传输和设置的内容,红色部分则代表作为接收者所反馈给对方的信息
任务书到这里就结束了,往下就是 FAQs 的部分了。看起来很简单是吧,只要把之前做过的 Receiver 和 Sender 拼拼凑凑就好了。我们试着跑一下 check
好家伙 162 个测试点,果然有点东西。。。
不管了,饭要一口一口吃,路要一步一步走,先把以前实现的能直接调用的 api 先填上去吧
size_t TCPConnection::remaining_outbound_capacity() const { return _sender.stream_in().remaining_capacity() }
size_t TCPConnection::bytes_in_flight() const { return _sender.bytes_in_flight(); }
size_t TCPConnection::unassembled_bytes() const { return _receiver.unassembled_bytes(); }
虽然不知道对不对吧。。。先跑个测试看看(信息太多了还得输出到 log 文件里面看2333)
包含有 SYN 的连接请求是发送了,但是并没有对这个 SYN 的确认报文,所以其实这个 connect
还应该有包含确认的应答?并且看 warning:Unclean shutdown of TCPConnection,所以在发完一个报文之后需要一个 clean 的 shutdown。
那么什么是 clean shutdown 什么是 unclean shutdown 呢?在第五节任务书告诉我们
在 unclean shutdown 情况下,发送或接收的报文段包会含有 RST 的标志,在这个时候,出站和入站的字节流都是处于一个错误状态,并且 active()
也处于 false 状态
在 clean shutdown 情况下呢,自然就是在没有错误的情况下关闭连接了。什么时候可以 clean shutdown 呢?
- 入站字节流已经被完整组装
- 出站字节流已经被全部发送(包括 FIN )
- 出站字节流被接收方全部确认
- 本地 TCPConnection 确保远端也满足条件 3,具体实现呢,他给了两种方法
- 当条件 1 到 3 都满足并且至少 10 倍的初始重传时间内本地都没有收到对方的新报文
- 被动关闭,但是这个我看的不是太懂
Prerequisites #1 through #3 are true, and the local peer is 100% certain that the remote peer can satisfy prerequisite #3. How can this be, if TCP doesn’t acknowledge acknowledgments? Because the remote peer was the first one to end its stream.
除此之外,还有一个 _linger_after_streams_finish
变量用于指示在出入流都结束后是否还需要保持 active 状态直到 10 倍重传时间之后。5.1 中提到,当入站流在出站流到达 EOF 之前结束的话,该变量需要设置成 false
明白了,这就把这两个函数写出来
void TCPConnection::unclean_shutdown(){
_active = false;
_sender.stream_in().set_error();
_receiver.stream_out().set_error();
if(!_sender.segments_out().empty()){
TCPSegment rst_seg = _sender.segments_out().front();
rst_seg.header().rst = true;
_segments_out.push(rst_seg);
_sender.segments_out().pop();
}else{
//若队列中没有报文段了需要生成一个空报文段来发送
_sender.send_empty_segment();
TCPSegment rst_seg = _sender.segments_out().front();
rst_seg.header().rst = true;
_segments_out.push(rst_seg);
_sender.segments_out().pop();
}
}
void TCPConnection::clean_shutdown(){
if(_receiver.stream_out().input_ended() && !_sender.stream_in().eof()){
_linger_after_streams_finish = false;
}
if(_receiver.stream_out().input_ended() && _sender.stream_in().eof() && _sender.segments_out().empty() && !_sender.bytes_in_flight()){
if(!_linger_after_streams_finish || time_since_last_segment_received() >= 10*_cfg.rt_timeout){
_active = false;
}
}
}
在 unclean shutdown 情况下,发送方还需要发送一个包含 RST 的报文段,但是在 clean shutdown 情况下,本来就是已经所有的报文段发送完了才 clean 的,自然不需要再发新的段了,把 active 状态变成 false 就够了
然后在析构函数里面,根据提示把unclean shutdown 填进去就行了
TCPConnection::~TCPConnection() {
try {
if (active()) {
cerr << "Warning: Unclean shutdown of TCPConnection\n";
unclean_shutdown();
// Your code here: need to send a RST segment to the peer
}
} catch (const exception &e) {
std::cerr << "Exception destructing TCP FSM: " << e.what() << std::endl;
}
}
再然后还有啥能写的呢。write 好像看起来比较简单,先把这个写了吧
size_t TCPConnection::write(const string &data) {
size_t write_length = _sender.stream_in().write(data);
_sender.fill_window();
while (!_sender.segments_out().empty())
{
TCPSegment seg = _sender.segments_out().front();
if(_receiver.ackno().has_value()){
seg.header().ack = true;
seg.header().ackno = _receiver.ackno().value();
seg.header().win = _receiver.window_size();
}
_segments_out.push(seg);
_sender.segments_out().pop();
}
return write_length;
}
需要注意的是,在发送报文段的时候,还需要附带接收端的 ackno 和 window_size 数据(如果存在的话)
还有 tick 函数
在 FAQ 中有这么一句话
When should I send a segment with the rst flag set?
If the sender has sent too many consecutive retransmissions without success (more
than TCPConfig::MAX RETX ATTEMPTS, i.e., 8).If the TCPConnection destructor is called while the connection is still active
(active() returns true)
第二点指的就是析构函数里了,第一点当重传次数过多 unclean shutdown 放在 tick 函数里面就好了
void TCPConnection::tick(const size_t ms_since_last_tick) {
_time_since_last_segment_received += ms_since_last_tick;
_sender.tick(ms_since_last_tick);
if(_sender.consecutive_retransmissions() > _cfg.MAX_RETX_ATTEMPTS){
unclean_shutdown();
}
}
再跑一次测试看看
嗯还是一样的错误,啥都没有发。(废话,receive 函数还没写呢,能回复出去就怪了/流汗黄豆)
不过 unclean shutdown 的 warning 已经没了,说明我写的应该没啥问题
找到了一张很牛逼的状态转换图,再配上前面的 lab 里面 sender 和 receiver 的 FSM ,顺着这个来做应该就行了吧
之前实现的 connect 有点瑕疵,当用于双向连接的时候,第二次握手应该带上对于第一次握手 SYN 的确认号
void TCPConnection::connect() {
_sender.fill_window();
TCPSegment seg = _sender.segments_out().front();
if(_receiver.ackno().has_value()){
seg.header().ack = true;
seg.header().ackno = _receiver.ackno().value();
seg.header().win = _receiver.window_size();
}
_segments_out.push(seg);
_sender.segments_out().pop();
}
先把接收 SYN 的部分写出来
void TCPConnection::segment_received(const TCPSegment &seg) {
if(seg.header().rst) unclean_shutdown();
_receiver.segment_received(seg);
_time_since_last_segment_received = 0;
//SYN_RECEIVED
if(seg.header().syn && _sender.next_seqno_absolute() == 0){
connect();
}
}
又跑了一次测试,又报了 unclean shutdown 的 warning。可是这里并不需要 unclean shutdown 啊?难道是每发完一次报文都需要关闭一次连接的意思吗?所以应该在发完报文后,就直接 clean shutdown了
—分割线—
一天又过去了,吭哧吭哧写了一堆又删了一堆,终于过了 43% 的测试了。这玩意属实折磨人,我已经没有写思路的心情了。。
现在 ESTABLISHED 之前的部分应该已经没啥问题了,看测试信息可以正常交换报文了。现在就是本来应该停止发送报文等待关闭连接的时候又发了报文段出去
然后 unclean shutdown 的 warning 依然在
size_t TCPConnection::remaining_outbound_capacity() const { return _sender.stream_in().remaining_capacity(); }
size_t TCPConnection::bytes_in_flight() const { return _sender.bytes_in_flight(); }
size_t TCPConnection::unassembled_bytes() const { return _receiver.unassembled_bytes(); }
size_t TCPConnection::time_since_last_segment_received() const { return _time_since_last_segment_received; }
void TCPConnection::segment_received(const TCPSegment &seg) {
if(!_active) return ;
// _receiver.segment_received(seg);
_time_since_last_segment_received = 0;
//bool syn_recv = _receiver.ackno().has_value() && !_receiver.stream_out().input_ended();
bool send_empty = false;
if(!_receiver.segment_received(seg)){
send_empty = true;
}
bool listen = !_receiver.ackno().has_value();
bool syn_sent = _sender.next_seqno_absolute() > 0 && _sender.next_seqno_absolute() == _sender.bytes_in_flight();
bool fin_sent = _sender.stream_in().eof() && _sender.next_seqno_absolute() == _sender.stream_in().bytes_written() + 2 && _sender.bytes_in_flight() > 0;
if(seg.header().rst){
if(syn_sent && !seg.header().ack) return;
unclean_shutdown();
return;
}
if(syn_sent && seg.header().ack && seg.payload().size() > 0){ return;}
if(!seg.header().syn && listen && _sender.next_seqno_absolute() == 0){
return;
}
// step 2 of 3-way-handshake
if(seg.header().syn && listen && _sender.next_seqno_absolute() == 0){
connect();
return;
}
if(_sender.next_seqno_absolute() > 0 && seg.header().ack){
if(_sender.ack_received(seg.header().ackno,seg.header().win)){
send_empty = true;
}
}
if(seg.length_in_sequence_space() > 0){
send_empty = true;
}
if(send_empty){
if(!fin_sent && _receiver.ackno().has_value() && _sender.segments_out().empty()){
_sender.send_empty_segment();
}
}
send_segment();
}
bool TCPConnection::active() const { return _active; }
size_t TCPConnection::write(const string &data) {
size_t write_length = _sender.stream_in().write(data);
send_segment();
return write_length;
}
//! \param[in] ms_since_last_tick number of milliseconds since the last call to this method
void TCPConnection::tick(const size_t ms_since_last_tick) {
if(!_active) return ;
_time_since_last_segment_received += ms_since_last_tick;
_sender.tick(ms_since_last_tick);
if(_sender.consecutive_retransmissions() > _cfg.MAX_RETX_ATTEMPTS){
unclean_shutdown();
}
send_segment();
}
void TCPConnection::end_input_stream() {
if(!_active) return;
_sender.stream_in().end_input();
send_segment();
}
void TCPConnection::connect() {
if(!_active) return;
_sender.fill_window();
TCPSegment seg = _sender.segments_out().front();
if(_receiver.ackno().has_value()){
seg.header().ack = true;
seg.header().ackno = _receiver.ackno().value();
seg.header().win = _receiver.window_size();
}
_segments_out.push(seg);
_sender.segments_out().pop();
clean_shutdown();
}
void TCPConnection::send_segment(){
if(!_active) return;
_sender.fill_window();
while (!_sender.segments_out().empty())
{
TCPSegment send = _sender.segments_out().front();
if(_receiver.ackno().has_value()){
send.header().ack = true;
send.header().ackno = _receiver.ackno().value();
send.header().win = _receiver.window_size();
}
_segments_out.push(send);
_sender.segments_out().pop();
}
clean_shutdown();
}
void TCPConnection::unclean_shutdown(){
_active = false;
_sender.stream_in().set_error();
_receiver.stream_out().set_error();
if(!_sender.segments_out().empty()){
TCPSegment rst_seg = _sender.segments_out().front();
rst_seg.header().rst = true;
_segments_out.push(rst_seg);
_sender.segments_out().pop();
}else{
_sender.send_empty_segment();
TCPSegment rst_seg = _sender.segments_out().front();
rst_seg.header().rst = true;
_segments_out.push(rst_seg);
_sender.segments_out().pop();
}
}
void TCPConnection::clean_shutdown(){
if(_receiver.stream_out().input_ended() && !_sender.stream_in().eof()){
_linger_after_streams_finish = false;
}
if(_receiver.stream_out().input_ended() && _sender.stream_in().eof() && _sender.segments_out().empty() && !_sender.bytes_in_flight()){
if(!_linger_after_streams_finish || time_since_last_segment_received() >= 10*_cfg.rt_timeout){
_active = false;
}
}
}
TCPConnection::~TCPConnection() {
try {
if (active()) {
cerr << "Warning: Unclean shutdown of TCPConnection\n";
unclean_shutdown();
// Your code here: need to send a RST segment to the peer
}
} catch (const exception &e) {
std::cerr << "Exception destructing TCP FSM: " << e.what() << std::endl;
}
}