SSTable之文件格式及生成-leveldb源码剖析(7)

SSTable文件的生成时机,一是在将Immutable Memtable的内存数据flush落盘时,二是在做major compaction合并文件时,两者实际均通过封装的TableBuilder完成,本文仅讨论SSTable的文件格式,以及TableBuilder(包含BlockBuilder、FilterBlockBuilder)的运行过程。

在leveldb的doc下有table_format.txt阐述了文件设计构成,最新版本的SSTable文件名后缀为ldb,同时保留了sst后缀的兼容,我们先从宏观上来看SSTable文件的构成格式:sstable_format在当前版本,meta_block与其对应的meta_index_block并没有完全实现,该部分代之以Options可选配置的方式填充filter_block信息,并将filter_block_handle编码后写入meta_index_block,也就是说现有meta_index_block仅包含filter_meta_block对应的元数据(索引信息),将来对meta_block的实现像data_block一样会在BlockBuilder中完成,meta_block可以对应保存data_block的元数据,meta_index_block则写入全部meta_block对应的元数据。

下面我们具体看下各个block的详细格式,实际在落盘时会在每个block尾部追加5个字节,其中一字节的类型字段,如是否压缩,另外四字节为数据块的CRC32校验。为表意格式设计,不再对尾部单独说明。好,先来看data_block:data_block_format上图是整个data_block的编码格式,KV对以紧邻的前一key的最长公共前缀进行压缩编码为delta_string,value不再编码,以Options可配置的block_restart_interval对应数量的KV对记录当前偏移信息写入restarts部分。在最终写入整个数据块时可以根据配置选择是否进行压缩。

再来看filter_block:
filter_indexFilterPolicy可指定为leveldb默认提供的BloomFilterPolicy,当前leveldb默认的data block大小为4KB,filter policy默认的base data为2KB,尾部存储对应基数,filter_index_offset存储filter index对应数组的起始offset,偏移数组的元素则存储具体每个filter block的offset。

接着来看index_block,该 block的构建与data_block一样都是通过BlockBuilder,自然文件格式也是完全一致的,但需要注意存储的KV对,key为通过当前data_block最后一个key与下一紧邻data_block第一个key的最长公共前缀来构造一个在两者中间尽可能短的索引key,简单举例来说,若当前data_block最后一个key为”the quick brown fox”,下一块第一个key为”the who”,则索引key为”the r”;value部分则通过pending_handle(BlockHandle类)记录编码到当前data_block的offset和当前块大小再对offset和size进行varint64编码得到。在SSTable的全部data_block写完后,整个index_block的数据构建完毕,后续会落盘。该部分不再作图。

最后来看footer部分:footer-formatFooter固定48B,分别存储metaindex_handle和index_handle的当前编码offser和size,相当于metaindex和index的索引部分,最后加入8字节的小端编码的magic-number。

下面开始设计和实现很优雅的SSTable文件的构建生成部分,上源码:

struct TableBuilder::Rep {
  Options options;
  Options index_block_options;
  WritableFile* file;
  uint64_t offset;
  Status status;
  BlockBuilder data_block;
  BlockBuilder index_block;
  std::string last_key;
  int64_t num_entries;
  bool closed;          // Either Finish() or Abandon() has been called.
  FilterBlockBuilder* filter_block;

  bool pending_index_entry;
  BlockHandle pending_handle;  // Handle to add to index block

  std::string compressed_output;
};

void TableBuilder::Add(const Slice& key, const Slice& value) {
  Rep* r = rep_;
  assert(!r->closed);
  if (!ok()) return;
  if (r->num_entries > 0) {
    assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
  }

  if (r->pending_index_entry) {
    assert(r->data_block.empty());
    r->options.comparator->FindShortestSeparator(&r->last_key, key);
    std::string handle_encoding;
    r->pending_handle.EncodeTo(&handle_encoding);
    r->index_block.Add(r->last_key, Slice(handle_encoding));
    r->pending_index_entry = false;
  }

  if (r->filter_block != NULL) {
    r->filter_block->AddKey(key);
  }

  r->last_key.assign(key.data(), key.size());
  r->num_entries++;
  r->data_block.Add(key, value);

  const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
  if (estimated_block_size >= r->options.block_size) {
    Flush();
  }
}

void TableBuilder::Flush() {
  Rep* r = rep_;
  assert(!r->closed);
  if (!ok()) return;
  if (r->data_block.empty()) return;
  assert(!r->pending_index_entry);
  WriteBlock(&r->data_block, &r->pending_handle);
  if (ok()) {
    r->pending_index_entry = true;
    r->status = r->file->Flush();
  }
  if (r->filter_block != NULL) {
    r->filter_block->StartBlock(r->offset);
  }
}

void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
  // File format contains a sequence of blocks where each block has:
  //    block_data: uint8[n]
  //    type: uint8
  //    crc: uint32
  assert(ok());
  Rep* r = rep_;
  Slice raw = block->Finish();

  Slice block_contents;
  CompressionType type = r->options.compression;
  // TODO(postrelease): Support more compression options: zlib?
  switch (type) {
    case kNoCompression:
      block_contents = raw;
      break;

    case kSnappyCompression: {
      std::string* compressed = &r->compressed_output;
      if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
          compressed->size() < raw.size() - (raw.size() / 8u)) {
        block_contents = *compressed;
      } else {
        // Snappy not supported, or compressed less than 12.5%, so just
        // store uncompressed form
        block_contents = raw;
        type = kNoCompression;
      }
      break;
    }
  }
  WriteRawBlock(block_contents, type, handle);
  r->compressed_output.clear();
  block->Reset();
}

void TableBuilder::WriteRawBlock(const Slice& block_contents,
                                 CompressionType type,
                                 BlockHandle* handle) {
  Rep* r = rep_;
  handle->set_offset(r->offset);
  handle->set_size(block_contents.size());
  r->status = r->file->Append(block_contents);
  if (r->status.ok()) {
    char trailer[kBlockTrailerSize];
    trailer[0] = type;
    uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size());
    crc = crc32c::Extend(crc, trailer, 1);  // Extend crc to cover block type
    EncodeFixed32(trailer+1, crc32c::Mask(crc));
    r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
    if (r->status.ok()) {
      r->offset += block_contents.size() + kBlockTrailerSize;
    }
  }
}

Status TableBuilder::Finish() {
  Rep* r = rep_;
  Flush();
  assert(!r->closed);
  r->closed = true;

  BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;

  // Write filter block
  if (ok() && r->filter_block != NULL) {
    WriteRawBlock(r->filter_block->Finish(), kNoCompression,
                  &filter_block_handle);
  }

  // Write metaindex block
  if (ok()) {
    BlockBuilder meta_index_block(&r->options);
    if (r->filter_block != NULL) {
      // Add mapping from "filter.Name" to location of filter data
      std::string key = "filter.";
      key.append(r->options.filter_policy->Name());
      std::string handle_encoding;
      filter_block_handle.EncodeTo(&handle_encoding);
      meta_index_block.Add(key, handle_encoding);
    }

    // TODO(postrelease): Add stats and other meta blocks
    WriteBlock(&meta_index_block, &metaindex_block_handle);
  }

  // Write index block
  if (ok()) {
    if (r->pending_index_entry) {
      r->options.comparator->FindShortSuccessor(&r->last_key);
      std::string handle_encoding;
      r->pending_handle.EncodeTo(&handle_encoding);
      r->index_block.Add(r->last_key, Slice(handle_encoding));
      r->pending_index_entry = false;
    }
    WriteBlock(&r->index_block, &index_block_handle);
  }

  // Write footer
  if (ok()) {
    Footer footer;
    footer.set_metaindex_handle(metaindex_block_handle);
    footer.set_index_handle(index_block_handle);
    std::string footer_encoding;
    footer.EncodeTo(&footer_encoding);
    r->status = r->file->Append(footer_encoding);
    if (r->status.ok()) {
      r->offset += footer_encoding.size();
    }
  }
  return r->status;
}

Add方法是SSTable构建的入口。

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注