Stanford CS144 Lab3 小结

目录

本文为斯坦福大学计算机网络课程 CS144 编程任务 Lab Assignment 3 的学习小结

官网 https://cs144.github.io/

Lab 3 文档 https://cs144.github.io/assignments/lab3.pdf

个人实验备份代码 https://github.com/deepzheng/sponge


The TCP Sender

我们所实现的 TCPSender 需要具备如下功能:

  • 跟踪接收端的窗口大小,并根据接收端的窗口尺寸调整发送速率(流量控制
  • 尽可能填充发送窗口并发送报文段直到窗口已满或发送数据完成
  • 跟踪已发送但尚未被接收的报文段,并在合适的时机重传(超时重传

本次任务我们主要需要完成以下几个函数

fill_window()

将输入的 ByteStream 尽可能填充到发送窗口中,

ack_received( const WrappingInt32 ackno, const uint16_t window_size)

从接收者发送来的确认报文中,获取确认序列号和接收窗口大小便于 Sender 删除驻留在队列中已经发送但尚未被确认的报文,并调整发送窗口的大小

tick( const size_t ms_since_last_tick )

唯一的调用接口来检查已发送但尚未被接收的报文是否发生超时,若超时将执行重传

send_empty_segment()

发送一段字节长度为 0 的报文段,这样的空报文段不需要被标记为未确认段(outstanding)并且不需要启动计时器也不会被重传

以下是实现思路以及一些小细节

用队列存储发送报文段,同样用一个队列来存储发送后等待被确认的报文段(空报文段不需要被推入该队列)

在连接建立时,TCPSender 会发送一个只含有 SYN 信号的报文段,且该报文段只用于握手,而 FIN 信号可以包含在含有数据的报文段中。当接收端接收到 FIN 信号且成功返回对于 FIN 的确认后,连接才正式关闭。因此,当 SYN 尚未被确认且窗口非 0 时,此时仍然不能发送数据

TCP 实行的是累计确认,即 TCP 只确认该流中至第一个丢失字节为止的字节。 在接收到确认序号 ackno 后,意味着在该序号前的所有字节已经被确认,就可以把这些字节在等待队列中删除。同时,如果发生超时重传,只需要重传等待队列队头的报文即可

对于发送方的 windows size ,还有一个问题需要注意。就是当接收端缓存已满(即窗口大小为 0 )时,将这一信息返回给发送方,并且刚好发送方也没有任何数据要发送。这时候就会出现一个很尴尬的情况,就是发送方接收不到任何来自接收端窗口大小变化的通知(因为 TCP 仅当有数据或者确认要发时才会发送报文段,而窗口大小的信息是附在这些报文段中的)。所以此时发送端相当于被阻塞了。对于这种情况,TCP 规定将窗口大小视为 1 ,在接收端窗口为 0 时继续发送大小为 1 的报文段。这些报文段最终将被接收并返回新的窗口大小。所以在代码实现中加入了 _win_zero_flag 变量

    if (_win_size == 0) {
        _win_zero_flag = true;
        _win_size = 1;
    } else {
        _win_zero_flag = false;
    }

当反馈回来的窗口大小为 0 时,强制调整为 1 ,并将 _win_zero_flag设为 true 用于辅助判断在超时重传时是否需要将重传时间倍增

总的实现代码如下

tcp_sender.hh

class TCPSender {
  private:
    //! our initial sequence number, the number for our SYN.
    WrappingInt32 _isn;

    //! outbound queue of segments that the TCPSender wants sent
    std::queue<TCPSegment> _segments_out{};
    std::queue<TCPSegment> _segments_waiting{};
    //! retransmission timer for the connection
    unsigned int _initial_retransmission_timeout;

    ByteStream _stream;

    uint64_t _next_seqno{0}; // 下一个将被发送的字节序号
    uint64_t _ack_seqno{0}; // 最大的被确认的字节序号
    uint64_t _win_size{1};
    size_t _consecutive_retransmission{0};
    unsigned int _total_time{0};
    unsigned int _retransmisson_timeout{0};
    bool _is_timer_running{false};
    bool _is_syn{false};
    bool _is_fin{false};
    bool _win_zero_flag{false};
    
  public:
    //! Initialize a TCPSender
    TCPSender(const size_t capacity = TCPConfig::DEFAULT_CAPACITY,
              const uint16_t retx_timeout = TCPConfig::TIMEOUT_DFLT,
              const std::optional<WrappingInt32> fixed_isn = {});


    ByteStream &stream_in() { return _stream; }
    const ByteStream &stream_in() const { return _stream; }

    std::queue<TCPSegment> &segments_waiting() { return _segments_waiting; }
    const std::queue<TCPSegment> &segments_waiting() const { return _segments_waiting; }


    void ack_received(const WrappingInt32 ackno, const uint16_t window_size);

    void send_empty_segment();


    void fill_window();

    void tick(const size_t ms_since_last_tick);

    size_t bytes_in_flight() const;

    unsigned int consecutive_retransmissions() const;

    std::queue<TCPSegment> &segments_out() { return _segments_out; }

    uint64_t next_seqno_absolute() const { return _next_seqno; }

    WrappingInt32 next_seqno() const { return wrap(_next_seqno, _isn); }
};

tcp_sender.cc

TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn)
    : _isn(fixed_isn.value_or(WrappingInt32{random_device()()}))
    , _initial_retransmission_timeout{retx_timeout}
    , _stream(capacity)
    , _retranmisson_timeout(retx_timeout) {}

uint64_t TCPSender::bytes_in_flight() const { return _next_seqno - _ack_seqno; }

void TCPSender::fill_window() {
    if (_win_size == 0 || _is_fin) {
        return;
    }
    TCPSegment seg;
    if (!_is_syn) {
        // only send a syn flag in order to handshake
        _is_syn = true;
        seg.header().syn = true;
        seg.header().seqno = next_seqno();
        _next_seqno++;
        _win_size--;
        segments_out().push(seg);
        segments_waiting().push(seg);
    } else if (stream_in().eof()) {
        // only send a fin flag
        _is_fin = true;
        seg.header().fin = true;
        seg.header().seqno = next_seqno();
        _next_seqno++;
        _win_size--;
        segments_out().push(seg);
        segments_waiting().push(seg);
    } else {
        // send the normal payload
        while (_win_size > 0 && !stream_in().buffer_empty()) {
            seg.header().seqno = next_seqno();
            uint64_t send_len = min(_win_size, min(TCPConfig::MAX_PAYLOAD_SIZE, stream_in().buffer_size()));
            seg.payload() = stream_in().read(send_len);
            if (seg.length_in_sequence_space() < _win_size && stream_in().eof()) {
                _is_fin = true;
                seg.header().fin = true;
            }
            _next_seqno += seg.length_in_sequence_space();
            _win_size -= seg.length_in_sequence_space();
            segments_out().push(seg);
            segments_waiting().push(seg);
        }
    }
    if (!_is_timer_running) {
        // start timer
        _is_timer_running = true;
        _total_time = 0;
    }
}

//! \param ackno The remote receiver's ackno (acknowledgment number)
//! \param window_size The remote receiver's advertised window size
void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
    uint64_t ack_abs = unwrap(ackno, _isn, _next_seqno);
    _win_size = window_size;
    if (ack_abs > _next_seqno || ack_abs <= _ack_seqno)
        return;

    _retranmisson_timeout = _initial_retransmission_timeout;
    _consecutive_retransmission = 0;
    while (!segments_waiting().empty()) {
        TCPSegment seg = segments_waiting().front();
        uint64_t seqno = unwrap(seg.header().seqno, _isn, _next_seqno) + seg.length_in_sequence_space();
        if (seqno <= ack_abs) {
            _ack_seqno = seqno;
            segments_waiting().pop();
        } else
            break;
    }
    if (!segments_waiting().empty()) {
        // restart timer
        _total_time = 0;
        _is_timer_running = true;
    }
    //如果没有发送但未确认的报文,关闭计时器
    if (bytes_in_flight() == 0) {
        _is_timer_running = false;
    }
    if (bytes_in_flight() > window_size) {
        //当发送中的数据大于窗口尺寸时,暂停发送新数据
        _win_size = 0;
        _win_zero_flag = true;
        return;
    }
    if (_win_size == 0) {
        _win_zero_flag = true;
        _win_size = 1;
    } else {
        _win_zero_flag = false;
    }
    fill_window();
}

//! \param[in] ms_since_last_tick the number of milliseconds since the last call to this method
void TCPSender::tick(const size_t ms_since_last_tick) {
    _total_time += ms_since_last_tick;

    if (_total_time >= _retransmisson_timeout && !segments_waiting().empty()) {
        // only retransmit the head of queue
        segments_out().push(segments_waiting().front());
        if (!_win_zero_flag) {
            _consecutive_retransmission++;
            _retranmisson_timeout *= 2;
        }
        _total_time = 0;
    }
    if (segments_waiting().empty()) {
        _is_timer_running = false;
    }
}

unsigned int TCPSender::consecutive_retransmissions() const { return _consecutive_retransmission; }

void TCPSender::send_empty_segment() {
    TCPSegment seg;
    seg.header().seqno = next_seqno();
    segments_out().push(seg);
}

小结

这次的 Lab 明显比前几次的难度提升了一个档次。完全读通文档的要求就挺费精力的了,等到具体的代码实现,又开始不知所措了。勉勉强强写完最后又是一堆错误,连写带调(中间也参考了一下别人的思路)花掉了快两天时间,终于过了测试,虽然感觉自己的代码非常的丑陋qwq。不得不说,随着实验的深入,要考虑的细节越来越多了,虽然只是个简易的 TCP,但是对于各种边边角角的情况都需要考虑到,这几天基本就指着测试来帮我找坑了。。

终于只剩下最后一个 Lab 了,虽然 Lab4 的变态程度早已听闻。。不过不要怕,胜利就在眼前,干就完事了!

打赏一个呗

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦