SSTable之两级Cache-leveldb源码剖析(8)

两级Cache,即TableCache和BlockCache,前者用于缓存SSTable的对应索引元数据,后者用于缓存文件block数据块。本篇主要分析key查找在MemTable(包括mem和imm)均未命中后续的查找过程,主要是Cache这部分的实现。

来看TableCache,先简单梳理下到TableCache的查找流程,用户提交key查询,交由Status DBImpl::Get(const ReadOptions& options, const Slice& key, std::string* value) ,获取两种MemTable和当前Version,依次查询memtable、immutable memtable ,未找到则在当前Version上Status Version::Get(const ReadOptions& options, const LookupKey& k, std::string* value, GetStats* stats),依次从最低level到最高level查询直至查到,在每层确定可能包含该key的SSTable文件后,就会在所属VersionSet的table_cache中继续查询,即调用Status TableCache::Get(const ReadOptions& options, uint64_t file_number, uint64_t file_size, const Slice& k, void* arg, void (*handle_result)(void*, const Slice&, const Slice&));。
让我们来看TableCache::Get的实现,这也是本篇关注的,先上代码:

Status TableCache::Get(const ReadOptions& options,
                       uint64_t file_number,
                       uint64_t file_size,
                       const Slice& k,
                       void* arg,
                       void (*saver)(void*, const Slice&, const Slice&)) {
  Cache::Handle* handle = NULL;
  Status s = FindTable(file_number, file_size, &handle);
  if (s.ok()) {
    Table* t = reinterpret_cast<TableAndFile*>(cache_->Value(handle))->table;
    s = t->InternalGet(options, k, arg, saver);
    cache_->Release(handle);
  }
  return s;
}

Status TableCache::FindTable(uint64_t file_number, uint64_t file_size,
                             Cache::Handle** handle) {
  Status s;
  char buf[sizeof(file_number)];
  EncodeFixed64(buf, file_number);
  Slice key(buf, sizeof(buf));
  *handle = cache_->Lookup(key);
  if (*handle == NULL) {
    std::string fname = TableFileName(dbname_, file_number);
    RandomAccessFile* file = NULL;
    Table* table = NULL;
    s = env_->NewRandomAccessFile(fname, &file);
    if (!s.ok()) {
      std::string old_fname = SSTTableFileName(dbname_, file_number);
      if (env_->NewRandomAccessFile(old_fname, &file).ok()) {
        s = Status::OK();
      }
    }
    if (s.ok()) {
      s = Table::Open(*options_, file, file_size, &table);
    }

    if (!s.ok()) {
      assert(table == NULL);
      delete file;
      // We do not cache error results so that if the error is transient,
      // or somebody repairs the file, we recover automatically.
    } else {
      TableAndFile* tf = new TableAndFile;
      tf->file = file;
      tf->table = table;
      *handle = cache_->Insert(key, tf, 1, &DeleteEntry);
    }
  }
  return s;
}

TableCache的Get方法通过传入SSTable的file_number和file_size来调用私有的FindTable方法来获取该SSTable对应的Cache::Handle句柄(即TableCache的对应entry),这里同时完成若未命中TableCache则打开该SSTable对应的文件并预加载索引信息到内存,以TableAndFile的形式保存相应句柄,并插入TableCache。
在此我们可以看到TableCache的KV格式,以file_number作key,以TableAndFile对应的预加载索引作为value。

另外,TableCache entry的插入在Compaction时也有体现,每当通过Compatction生成新的SSTable文件,也会以验证正常可用的方式更新该SSTable的索引信息到TableCache。

获取该key所属SSTable的句柄后,调用Status Table::InternalGet(const ReadOptions& options, const Slice& k, void* arg, void (*saver)(void*, const Slice&, const Slice&))查找真正包含该key的数据块,来看实现:

Status Table::InternalGet(const ReadOptions& options, const Slice& k,
                          void* arg,
                          void (*saver)(void*, const Slice&, const Slice&)) {
  Status s;
  Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator);
  iiter->Seek(k);
  if (iiter->Valid()) {
    Slice handle_value = iiter->value();
    FilterBlockReader* filter = rep_->filter;
    BlockHandle handle;
    if (filter != NULL &&
        handle.DecodeFrom(&handle_value).ok() &&
        !filter->KeyMayMatch(handle.offset(), k)) {
      // Not found
    } else {
      Iterator* block_iter = BlockReader(this, options, iiter->value());
      block_iter->Seek(k);
      if (block_iter->Valid()) {
        (*saver)(arg, block_iter->key(), block_iter->value());
      }
      s = block_iter->status();
      delete block_iter;
    }
  }
  if (s.ok()) {
    s = iiter->status();
  }
  delete iiter;
  return s;
}

通过创建索引块index_block的迭代器来定位key所属数据块的索引元数据,调用BlockReader并返回该数据块的迭代器,这时就需要查找本篇的第二个Cache,即BlockCache,来看代码:

// Convert an index iterator value (i.e., an encoded BlockHandle)
// into an iterator over the contents of the corresponding block.
Iterator* Table::BlockReader(void* arg,
                             const ReadOptions& options,
                             const Slice& index_value) {
  Table* table = reinterpret_cast<Table*>(arg);
  Cache* block_cache = table->rep_->options.block_cache;
  Block* block = NULL;
  Cache::Handle* cache_handle = NULL;

  BlockHandle handle;
  Slice input = index_value;
  Status s = handle.DecodeFrom(&input);
  // We intentionally allow extra stuff in index_value so that we
  // can add more features in the future.

  if (s.ok()) {
    BlockContents contents;
    if (block_cache != NULL) {
      char cache_key_buffer[16];
      EncodeFixed64(cache_key_buffer, table->rep_->cache_id);
      EncodeFixed64(cache_key_buffer+8, handle.offset());
      Slice key(cache_key_buffer, sizeof(cache_key_buffer));
      cache_handle = block_cache->Lookup(key);
      if (cache_handle != NULL) {
        block = reinterpret_cast<Block*>(block_cache->Value(cache_handle));
      } else {
        s = ReadBlock(table->rep_->file, options, handle, &contents);
        if (s.ok()) {
          block = new Block(contents);
          if (contents.cachable && options.fill_cache) {
            cache_handle = block_cache->Insert(
                key, block, block->size(), &DeleteCachedBlock);
          }
        }
      }
    } else {
      s = ReadBlock(table->rep_->file, options, handle, &contents);
      if (s.ok()) {
        block = new Block(contents);
      }
    }
  }

  Iterator* iter;
  if (block != NULL) {
    iter = block->NewIterator(table->rep_->options.comparator);
    if (cache_handle == NULL) {
      iter->RegisterCleanup(&DeleteBlock, block, NULL);
    } else {
      iter->RegisterCleanup(&ReleaseBlock, block_cache, cache_handle);
    }
  } else {
    iter = NewErrorIterator(s);
  }
  return iter;
}

通过在该SSTable所属的BlockCache中查找该数据块,若未命中,则从文件读取该数据块,并将本次结果写入BlockCache。
在此我们可以看到BlockCache的KV格式,以cache_id和该数据块的offset(通过传入的数据块索引可知)联合作key,以包含BlockContents的Block作为value。

到此两级Cache的读取和更新写入已经比较清晰,下面再补充一下leveldb对Cache数据结构的设计:

class LRUCache {
 public:
  LRUCache();
  ~LRUCache();

  // Separate from constructor so caller can easily make an array of LRUCache
  void SetCapacity(size_t capacity) { capacity_ = capacity; }

  // Like Cache methods, but with an extra "hash" parameter.
  Cache::Handle* Insert(const Slice& key, uint32_t hash,
                        void* value, size_t charge,
                        void (*deleter)(const Slice& key, void* value));
  Cache::Handle* Lookup(const Slice& key, uint32_t hash);
  void Release(Cache::Handle* handle);
  void Erase(const Slice& key, uint32_t hash);
  void Prune();
  size_t TotalCharge() const {
    MutexLock l(&mutex_);
    return usage_;
  }

 private:
  void LRU_Remove(LRUHandle* e);
  void LRU_Append(LRUHandle* e);
  void Unref(LRUHandle* e);

  // Initialized before use.
  size_t capacity_;

  // mutex_ protects the following state.
  mutable port::Mutex mutex_;
  size_t usage_;

  // Dummy head of LRU list.
  // lru.prev is newest entry, lru.next is oldest entry.
  LRUHandle lru_;

  HandleTable table_;
};

以上是LRUCache的接口设计,主要由两大组件LRUHandle和HandleTable构成,因此在插入(Insert)或者删除(Erase)时需要对这两个组件都要处理。
其中LRUHandle是一个双向链表,节点存储实际的KV数据并按照访问时间排序,注意该结构已经使用hash成员保存了对应key的哈希值,不必重新计算;next_hash用于hash冲突时探测;一个LRUHandle结构体把用来做LRU访问需要使用的带有前驱与后继的双向链表和解决哈希冲突的单链表有机结合了起来。
HandleTable用于hash分桶,存储LRUHandle指针。

这里格外强调HandleTable用于解决冲突时在冲突链中FindPointer方法:

  // Return a pointer to slot that points to a cache entry that
  // matches key/hash.  If there is no such cache entry, return a
  // pointer to the trailing slot in the corresponding linked list.
  LRUHandle** FindPointer(const Slice& key, uint32_t hash) {
    LRUHandle** ptr = &list_[hash & (length_ - 1)];
    while (*ptr != NULL &&
           ((*ptr)->hash != hash || key != (*ptr)->key())) {
      ptr = &(*ptr)->next_hash;
    }
    return ptr;
  }

该方法返回值为一个二级指针,这里为什么要这样做呢?
正常对单链表进行插入或删除需要找到待插或待删位置的前驱节点然后修改相关指针,而这里返回待插或待删位置(next_hash)的二级指针,直接修改该二级指针对应内容即可完成插入或删除,简单方便。

为更好的解决访问冲突和减少锁开销,又封装了一层ShardedLRUCache,sharding到多个LRUCache,其中以key对应哈希值的最高四位进行分桶。

BlockCache的默认大小为8M,在db_impl.cc中SanitizeOptions函数未指定block_cache时初始默认大小,用户Options的构造函数初始化列表block_cache默认为NULL。
TableCache的大小在DBImpl的构造函数中由Options的max_open_files(默认1000)指定,并预留kNumNonTableCacheFiles(10)个可打开文件作为它用。

发表评论

电子邮件地址不会被公开。 必填项已用*标注