log日志文件-leveldb源码剖析(4)

往leveldb写一个KV对时先写log,再写MemTable,这样完成一次写入,写log是追加顺序写文件,写MemTable是跳表插入写内存,随机写转化成顺序写,只涉及一次磁盘IO及一次内存写,因此leveldb写入速度非常快。先写log也是为防止发生如进程挂掉时,MemTable的数据仍然可以从log进行重建恢复,不会造成数据丢失。

log文件的读写均是顺序IO,因此不需要额外的索引信息。

leveldb把每次写操作都会以日志文件记录下来,先来看格式,相关定义如下:

namespace log {

enum RecordType {
  // Zero is reserved for preallocated files
  kZeroType = 0,

  kFullType = 1,

  // For fragments
  kFirstType = 2,
  kMiddleType = 3,
  kLastType = 4
};
static const int kMaxRecordType = kLastType;

static const int kBlockSize = 32768;

// Header is checksum (4 bytes), length (2 bytes), type (1 byte).
static const int kHeaderSize = 4 + 2 + 1;

}  

日志是分块写的,每块大小为32K,每条记录有7个字节的头部,前四字节为CRC校验,中间两字节为长度,最后一字节为记录类型。块大小固定有限,而记录有可能跨块,因此有三个枚举值kFirstType、kMiddleType、kLastType分别用来标记记录位置。

Writer类提供了AddRecord的写接口,再看具体实现:

Status Writer::AddRecord(const Slice& slice) {
  const char* ptr = slice.data();
  size_t left = slice.size();

  // Fragment the record if necessary and emit it.  Note that if slice
  // is empty, we still want to iterate once to emit a single
  // zero-length record
  Status s;
  bool begin = true;
  do {
    const int leftover = kBlockSize - block_offset_;
    assert(leftover >= 0);
    if (leftover < kHeaderSize) {
      // Switch to a new block
      if (leftover > 0) {
        // Fill the trailer (literal below relies on kHeaderSize being 7)
        assert(kHeaderSize == 7);
        dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
      }
      block_offset_ = 0;
    }

    // Invariant: we never leave < kHeaderSize bytes in a block.
    assert(kBlockSize - block_offset_ - kHeaderSize >= 0);

    const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
    const size_t fragment_length = (left < avail) ? left : avail;

    RecordType type;
    const bool end = (left == fragment_length);
    if (begin && end) {
      type = kFullType;
    } else if (begin) {
      type = kFirstType;
    } else if (end) {
      type = kLastType;
    } else {
      type = kMiddleType;
    }

    s = EmitPhysicalRecord(type, ptr, fragment_length);
    ptr += fragment_length;
    left -= fragment_length;
    begin = false;
  } while (s.ok() && left > 0);
  return s;
}

Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
  assert(n <= 0xffff);  // Must fit in two bytes
  assert(block_offset_ + kHeaderSize + n <= kBlockSize);

  // Format the header
  char buf[kHeaderSize];
  buf[4] = static_cast(n & 0xff);
  buf[5] = static_cast(n >> 8);
  buf[6] = static_cast(t);

  // Compute the crc of the record type and the payload.
  uint32_t crc = crc32c::Extend(type_crc_[t], ptr, n);
  crc = crc32c::Mask(crc);                 // Adjust for storage
  EncodeFixed32(buf, crc);

  // Write the header and the payload
  Status s = dest_->Append(Slice(buf, kHeaderSize));
  if (s.ok()) {
    s = dest_->Append(Slice(ptr, n));
    if (s.ok()) {
      s = dest_->Flush();
    }
  }
  block_offset_ += kHeaderSize + n;
  return s;
}

块大小固定32K,可能跨块,分片写入,首先计算当前块剩余是否足够容纳记录头,若不能全部填充空串(ASCII码0),若恰好仅能容纳记录头则允许插入一次0长的记录;若当前块剩余大于记录头的7字节大小,则计算是否需要分片及设置相应记录标记;之后设置相应记录头和计算CRC校验,写入实际记录内容,写状态成功后flush数据到页面缓存(page cache)。

leveldb写记录使用了CRC32校验,这里来复习下CRC校验算法:
采用 CRC 校验时,发送方和接收方用同一个生成多项式g(x),g(x)是一个GF(2)多项式,并且g(x)的首位和最后一位的系数必须为1。
CRC的处理方法是:发送方用发送数据的二进制多项式t(x)除以g(x),得到余数y(x)作为CRC校验码。校验时,以计算的校正结果是否为0为据,判断数据帧是否出错。设生成多项式是r阶的(最高位是x^r)具体步骤如下面的描述。
发送方:
1 )在发送的 m 位数据的二进制多项式 t(x) 后添加 r 个 0 ,扩张到 m+ r 位,以容纳 r 位的校验码,追加 0 后的二进制多项式为 T(x) ;
2 )用 T(x) 除以生成多项式 g(x) ,得到 r 位的余数 y(x) ,它就是 CRC 校验码;
3 )把 y(x) 追加到 t(x) 后面,此时的数据 s(x) 就是包含了 CRC 校验码的待发送字符串;由于 s(x) = t(x) y(x) ,因此 s(x) 肯定能被 g(x) 除尽。
接收方:
1 )接收数据 n(x) ,这个 n(x) 就是包含了 CRC 校验码的 m+r 位数据;
2 )计算 n(x) 除以 g(x) ,如果余数为 0 则表示传输过程没有错误,否则表示有错误。从 n(x) 去掉尾部的 r 位数据,得到的就是原始数据。
生成多项式不是随意选择的,标准的CRC32生成多项式:
x^32 + x^26 + x^23 + x^22 + x^16 + x^12 + x^11+ x^10 + x^8 + x^7 + x^5 + x^4 + x^2 + x + 1
以16进制表示就是0x04C11DB7。
实际的CRC32算法是面向字节的,而非低效的单比特处理,leveldb对该算法的具体代码实现,有时间再仔细分析。

为什么以32K这种固定块大小写数据?个人认为相对于写KV记录大小和读取记录的buffer数据块,32K可能是在性能上比较适宜的大小。

再来看日志记录的读取,Reader类提供了ReadRecord的读接口:

bool Reader::ReadRecord(Slice* record, std::string* scratch) {
  if (last_record_offset_ < initial_offset_) {
    if (!SkipToInitialBlock()) {
      return false;
    }
  }

  scratch->clear();
  record->clear();
  bool in_fragmented_record = false;
  // Record offset of the logical record that we're reading
  // 0 is a dummy value to make compilers happy
  uint64_t prospective_record_offset = 0;

  Slice fragment;
  while (true) {
    const unsigned int record_type = ReadPhysicalRecord(&fragment);

    // ReadPhysicalRecord may have only had an empty trailer remaining in its
    // internal buffer. Calculate the offset of the next physical record now
    // that it has returned, properly accounting for its header size.
    uint64_t physical_record_offset =
        end_of_buffer_offset_ - buffer_.size() - kHeaderSize - fragment.size();

    if (resyncing_) {
      if (record_type == kMiddleType) {
        continue;
      } else if (record_type == kLastType) {
        resyncing_ = false;
        continue;
      } else {
        resyncing_ = false;
      }
    }

    switch (record_type) {
      case kFullType:
        if (in_fragmented_record) {
          // Handle bug in earlier versions of log::Writer where
          // it could emit an empty kFirstType record at the tail end
          // of a block followed by a kFullType or kFirstType record
          // at the beginning of the next block.
          if (scratch->empty()) {
            in_fragmented_record = false;
          } else {
            ReportCorruption(scratch->size(), "partial record without end(1)");
          }
        }
        prospective_record_offset = physical_record_offset;
        scratch->clear();
        *record = fragment;
        last_record_offset_ = prospective_record_offset;
        return true;

      case kFirstType:
        if (in_fragmented_record) {
          // Handle bug in earlier versions of log::Writer where
          // it could emit an empty kFirstType record at the tail end
          // of a block followed by a kFullType or kFirstType record
          // at the beginning of the next block.
          if (scratch->empty()) {
            in_fragmented_record = false;
          } else {
            ReportCorruption(scratch->size(), "partial record without end(2)");
          }
        }
        prospective_record_offset = physical_record_offset;
        scratch->assign(fragment.data(), fragment.size());
        in_fragmented_record = true;
        break;

      case kMiddleType:
        if (!in_fragmented_record) {
          ReportCorruption(fragment.size(),
                           "missing start of fragmented record(1)");
        } else {
          scratch->append(fragment.data(), fragment.size());
        }
        break;

      case kLastType:
        if (!in_fragmented_record) {
          ReportCorruption(fragment.size(),
                           "missing start of fragmented record(2)");
        } else {
          scratch->append(fragment.data(), fragment.size());
          *record = Slice(*scratch);
          last_record_offset_ = prospective_record_offset;
          return true;
        }
        break;

      case kEof:
        if (in_fragmented_record) {
          // This can be caused by the writer dying immediately after
          // writing a physical record but before completing the next; don't
          // treat it as a corruption, just ignore the entire logical record.
          scratch->clear();
        }
        return false;

      case kBadRecord:
        if (in_fragmented_record) {
          ReportCorruption(scratch->size(), "error in middle of record");
          in_fragmented_record = false;
          scratch->clear();
        }
        break;

      default: {
        char buf[40];
        snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
        ReportCorruption(
            (fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
            buf);
        in_fragmented_record = false;
        scratch->clear();
        break;
      }
    }
  }
  return false;
}

Reader构造函数是可以指定初始读偏移(initial_offset_)的,如果上一次已读取记录的偏移(last_record_offset_)小于初始偏移,则在读取时首先需要校正读起始位置到当前块(32K)起始位置,若块内偏移在尾部不足记录头的大小范围内,则跳到下一块,如果skip调整失败直接返回读失败;下面是读取实际物理记录,初次读取可能需要resyncing跳过非初始块;也有可能读取到尾部0长的记录,其记录类型仅可能为kFirstType(某条新记录的初始),紧邻下一块可能为kFirstType或kFullType,Reader对这两种情形实现上做了兼容考虑;读取记录是否分片(fragment)跨块用in_fragmented_record来标记,结合该标记和记录类型,完成相应判断返回结果和读状态。
着重说明下几个成员变量,initial_offset_是指定的初始读偏移(相对于log文件),last_record_offset_是已读取到的记录偏移(相对于当前block块),end_of_buffer_offset_是已读取块偏移(相对于log文件),buffer_不仅用来缓存块数据,也用来配合计算实际读取的块内记录偏移。

下面看读取实际的记录过程:

unsigned int Reader::ReadPhysicalRecord(Slice* result) {
  while (true) {
    if (buffer_.size() < kHeaderSize) {
      if (!eof_) {
        // Last read was a full read, so this is a trailer to skip
        buffer_.clear();
        Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
        end_of_buffer_offset_ += buffer_.size();
        if (!status.ok()) {
          buffer_.clear();
          ReportDrop(kBlockSize, status);
          eof_ = true;
          return kEof;
        } else if (buffer_.size() < kBlockSize) {
          eof_ = true;
        }
        continue;
      } else {
        // Note that if buffer_ is non-empty, we have a truncated header at the
        // end of the file, which can be caused by the writer crashing in the
        // middle of writing the header. Instead of considering this an error,
        // just report EOF.
        buffer_.clear();
        return kEof;
      }
    }

    // Parse the header
    const char* header = buffer_.data();
    const uint32_t a = static_cast(header[4]) & 0xff;
    const uint32_t b = static_cast(header[5]) & 0xff;
    const unsigned int type = header[6];
    const uint32_t length = a | (b << 8);
    if (kHeaderSize + length > buffer_.size()) {
      size_t drop_size = buffer_.size();
      buffer_.clear();
      if (!eof_) {
        ReportCorruption(drop_size, "bad record length");
        return kBadRecord;
      }
      // If the end of the file has been reached without reading |length| bytes
      // of payload, assume the writer died in the middle of writing the record.
      // Don't report a corruption.
      return kEof;
    }

    if (type == kZeroType && length == 0) {
      // Skip zero length record without reporting any drops since
      // such records are produced by the mmap based writing code in
      // env_posix.cc that preallocates file regions.
      buffer_.clear();
      return kBadRecord;
    }

    // Check crc
    if (checksum_) {
      uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
      uint32_t actual_crc = crc32c::Value(header + 6, 1 + length);
      if (actual_crc != expected_crc) {
        // Drop the rest of the buffer since "length" itself may have
        // been corrupted and if we trust it, we could find some
        // fragment of a real log record that just happens to look
        // like a valid log record.
        size_t drop_size = buffer_.size();
        buffer_.clear();
        ReportCorruption(drop_size, "checksum mismatch");
        return kBadRecord;
      }
    }

    buffer_.remove_prefix(kHeaderSize + length);

    // Skip physical record that started before initial_offset_
    if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length <
        initial_offset_) {
      result->clear();
      return kBadRecord;
    }

    *result = Slice(header + kHeaderSize, length);
    return type;
  }
}

内部使用了buffer_缓存当前数据块,eof_用来标记文件是否读到结束,接着是解析头部,如果指定了CRC32校验,则对当前记录进行校验,更新buffer_已读取偏移以备下次顺序IO使用,最后构造实际数据填充结果。

leveldb作者在doc/log_format.txt中对这种日志格式设计的优缺点给予了说明,好处如下:
(1)不需要做额外的启发式resyncing,出错直接跳到下一块。
(2)支持如mapreduce所需的文件切分机制,按照完整逻辑记录切分即可。
(3)大记录读写无需添加外信息或buffer。
缺点:
(1)小记录无pack。
(2)不支持压缩。
这两点都可以通过添加新的记录类型支持。

refer:
1. http://blog.csdn.net/sparkliang/article/details/5671510

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注