Stanford CS144: Computer Network check1

Tecy 发布于 6 天前 21 次阅读


实现 Reassembler

我为什么要这么做?TCP 对重新排序和重复的稳健性来自于它能将字节流的任意摘录拼接回原始流的能力。在离散的可测试模块中实现这一功能将使处理传入段变得更容易。

Reassembler.hh

#pragma once

#include "byte_stream.hh"

#include <set>


class Seg
{
public:  
  Seg(uint64_t first_index, std::string data, bool is_last_substring) 
  : first_index_(first_index), data_(std::move(data)), is_last_substring_(is_last_substring) {}

  bool operator<(const Seg& that) const {
    return this->first_index_ < that.first_index_;
  }

  uint64_t size() const {
    return data_.size();
  }

  uint64_t first_index_;
  std::string data_;
  bool is_last_substring_;
};

class Reassembler
{
public:
  // Construct Reassembler to write into given ByteStream.
  explicit Reassembler( ByteStream&& output ) : output_( std::move( output ) ) {}

  /*
   * Insert a new substring to be reassembled into a ByteStream.
   *   `first_index`: the index of the first byte of the substring
   *   `data`: the substring itself
   *   `is_last_substring`: this substring represents the end of the stream
   *   `output`: a mutable reference to the Writer
   *
   * The Reassembler's job is to reassemble the indexed substrings (possibly out-of-order
   * and possibly overlapping) back into the original ByteStream. As soon as the Reassembler
   * learns the next byte in the stream, it should write it to the output.
   *
   * If the Reassembler learns about bytes that fit within the stream's available capacity
   * but can't yet be written (because earlier bytes remain unknown), it should store them
   * internally until the gaps are filled in.
   *
   * The Reassembler should discard any bytes that lie beyond the stream's available capacity
   * (i.e., bytes that couldn't be written even if earlier gaps get filled in).
   *
   * The Reassembler should close the stream after writing the last byte.
   */
  void insert( uint64_t first_index, std::string data, bool is_last_substring );

  // How many bytes are stored in the Reassembler itself?
  // This function is for testing only; don't add extra state to support it.
  uint64_t count_bytes_pending() const;

  // Access output stream reader
  Reader& reader() { return output_.reader(); }
  const Reader& reader() const { return output_.reader(); }

  // Access output stream writer, but const-only (can't write from outside)
  const Writer& writer() const { return output_.writer(); }

private:

  uint64_t next_index_ = 0; // next index to be added to stream.
  std::set<Seg> segs_ {}; // store substrings.

  ByteStream output_;
};

Reassembler.cc

#include "reassembler.hh"
#include "debug.hh"

using namespace std;

void Reassembler::insert( uint64_t first_index, string data, bool is_last_substring )
{
  // debug( "unimplemented insert({}, {}, {}) called", first_index, data, is_last_substring );

  // stream.
  Writer& stream = output_.writer();

  debug("before first_index: {}, next_index: {}, ac: {}", first_index, next_index_, stream.available_capacity());
  if (first_index + data.size() <= next_index_) {
    if (is_last_substring) {
      stream.close();
    }
    return;
  }

  if (first_index < next_index_) {
    data = data.substr(next_index_ - first_index);
    first_index = next_index_;
  }

  if (first_index >= next_index_ + stream.available_capacity()) {
    return;
  }

  if (first_index + data.size() > next_index_ + stream.available_capacity()) {
    data.resize(next_index_ + stream.available_capacity() - first_index); // bug fixed
    is_last_substring = false;
  }

  Seg cur(first_index, data, is_last_substring);
  auto p = segs_.upper_bound(cur);

  // prev segment exists?
  if (p != segs_.begin()) {
    auto prev_ = std::prev(p);

    // prev segment and current segment are mergeable?
    // suppose, prev [lp, rp), cur [lc, rc).
    // mergeable: rp >= lc.
    if (prev_->first_index_ + prev_->size() >= cur.first_index_) {

      // prev segment contains current segment.
      // contain: rp >= rc.
      if (prev_->first_index_ + prev_->size() >= cur.first_index_ + cur.size()) {
        return;
      }

      // merge two segments.
      // new segment: [lp, rc).
      std::string tmp = prev_->data_ + cur.data_.substr(prev_->first_index_ + prev_->size() - cur.first_index_);

      // update current segment.
      cur = Seg(prev_->first_index_, std::move(tmp), cur.is_last_substring_);

      // erase prev segment.
      segs_.erase(prev_);
    }
  }

  // current segment contains p segment?
  // remove p.
  while (p != segs_.end() and cur.first_index_ + cur.size() >= p->first_index_ + p->size()) {
    auto next_ = std::next(p);
    segs_.erase(p);
    p = next_;
  }

  // next segment exist?
  if (p != segs_.end()) {
    // next segment and current segment are mergeable?
    if (cur.first_index_ + cur.size() >= p->first_index_) {
      // should we check contains?
      // No!, we have checked before.

      // merge two segment.
      std::string tmp = cur.data_ + p->data_.substr(cur.first_index_ + cur.size() - p->first_index_);

      // update segment.
      cur = Seg(cur.first_index_, std::move(tmp), p->is_last_substring_);

      // remove p.
      segs_.erase(p); 
    }
  }

  // insert current segment.
  segs_.insert(cur);

  debug("first_index: {}, next_index: {}, ac: {}", segs_.begin()->first_index_, next_index_, stream.available_capacity());

  // substring can be put into stream? 
  if (next_index_ >= segs_.begin()->first_index_ and stream.available_capacity() > 0) {
    Seg first_ = *segs_.begin();

    debug("data: {}", first_.data_);
    if (stream.available_capacity() >= first_.size()) {
      next_index_ += first_.size();
      stream.push(first_.data_);
      if (first_.is_last_substring_) {
        stream.close();
      }
      segs_.erase(segs_.begin());
    } else {
      next_index_ += stream.available_capacity();
      stream.push(first_.data_);
      segs_.erase(segs_.begin());
    }
  }
}

// How many bytes are stored in the Reassembler itself?
// This function is for testing only; don't add extra state to support it.
uint64_t Reassembler::count_bytes_pending() const
{
  // debug( "unimplemented count_bytes_pending() called" );

  uint64_t count = 0;
  for (auto& seg : segs_) {
    count += seg.size();
  }
  return count;
}