并发写-RocksDB源码剖析(3)

先前博文中分析了leveldb的写流程,当写线程获取锁将本次写数据(含WriteBatch)放入写队列后却不是队首写线程时,线程本身会阻塞等待直至队首线程写完毕,锁粒度比较大,相较于leveldb,rocksdb对于多线程下并发写入做了很多细节上的优化,让我们来一一分析。

DBImpl持有的WriteThread对象管理整个rocksdb在多线程下的写入,先来看下基础的数据结构:

class WriteThread {
 public:
  enum State : uint8_t {
    // The initial state of a writer.  This is a Writer that is
    // waiting in JoinBatchGroup.  This state can be left when another
    // thread informs the waiter that it has become a group leader
    // (-> STATE_GROUP_LEADER), when a leader that has chosen to be
    // non-parallel informs a follower that its writes have been committed
    // (-> STATE_COMPLETED), or when a leader that has chosen to perform
    // updates in parallel and needs this Writer to apply its batch (->
    // STATE_PARALLEL_FOLLOWER).
    STATE_INIT = 1,

    // The state used to inform a waiting Writer that it has become the
    // leader, and it should now build a write batch group.  Tricky:
    // this state is not used if newest_writer_ is empty when a writer
    // enqueues itself, because there is no need to wait (or even to
    // create the mutex and condvar used to wait) in that case.  This is
    // a terminal state unless the leader chooses to make this a parallel
    // batch, in which case the last parallel worker to finish will move
    // the leader to STATE_COMPLETED.
    STATE_GROUP_LEADER = 2,

    // A Writer that has returned as a follower in a parallel group.
    // It should apply its batch to the memtable and then call
    // CompleteParallelWorker.  When someone calls ExitAsBatchGroupLeader
    // or EarlyExitParallelGroup this state will get transitioned to
    // STATE_COMPLETED.
    STATE_PARALLEL_FOLLOWER = 4,

    // A follower whose writes have been applied, or a parallel leader
    // whose followers have all finished their work.  This is a terminal
    // state.
    STATE_COMPLETED = 8,

    // A state indicating that the thread may be waiting using StateMutex()
    // and StateCondVar()
    STATE_LOCKED_WAITING = 16,
  };

  struct Writer;

  struct ParallelGroup {
    Writer* leader;
    Writer* last_writer;
    SequenceNumber last_sequence;
    // before running goes to zero, status needs leader->StateMutex()
    Status status;
    std::atomic running;
  };

  // Information kept for every waiting writer.
  struct Writer {
    WriteBatch* batch;
    bool sync;
    bool no_slowdown;
    bool disable_wal;
    bool disable_memtable;
    uint64_t log_used;  // log number that this batch was inserted into
    uint64_t log_ref;   // log number that memtable insert should reference
    bool in_batch_group;
    WriteCallback* callback;
    bool made_waitable;          // records lazy construction of mutex and cv
    std::atomic<uint8_t> state;  // write under StateMutex() or pre-link
    ParallelGroup* parallel_group;
    SequenceNumber sequence;  // the sequence number to use for the first key
    Status status;            // status of memtable inserter
    Status callback_status;   // status returned by callback->Callback()
    std::aligned_storage<sizeof(std::mutex)>::type state_mutex_bytes;
    std::aligned_storage<sizeof(std::condition_variable)>::type state_cv_bytes;
    Writer* link_older;  // read/write only before linking, or as leader
    Writer* link_newer;  // lazy, read/write only before linking, or as leader
  };
};

枚举结构State用来标识Writer的写状态,构造状态机用于更细化的甚至无锁并发写入控制:
STATE_INIT:Writer(一次写操作)的初始状态,在JoinBatchGroup等待,直到其它线程成为group leader,该leader在并行模式下通知Write应用本次写成为follower或者非并行模式下直至本次写被成功提交成为completed状态。
STATE_GROUP_LEADER:告知等待的某Writer成为leader,构建write batch group。需要注意的是当没有其它正在等待的Writer时,作为一个结束状态;但如果该leader选择并行写入模式则最后一个并行写入完成的worker会将该Writer状态置为completed状态。
STATE_PARALLEL_FOLLOWER:并行写入模式下作为group内的follower,将本次写入提交应用到memtable,调用CompleteParallelWorker。当其它Writer调用ExitAsBatchGroupLeader或EarlyExitParallelGroup,该状态转移到completed状态。
STATE_COMPLETED:正常结束状态,作为follower本次写入被提交应用完成或者作为leader在并行写入模式下所有follower均写入完成。
STATE_LOCKED_WAITING:条件等待,有锁。

再来看Writer这个类:
Writer是作为栈上局部对象创建使用,内部持有两个基于std::aligned_storage预先定义分配的两片栈上内存,需要同步时以placement new方式分别创建互斥锁和条件变量,相比直接new的方式更高效,对cache也更友好。

Writer一经创建,调用JoinBatchGroup:

void WriteThread::JoinBatchGroup(Writer* w) {
  static AdaptationContext ctx("JoinBatchGroup");

  assert(w->batch != nullptr);
  bool linked_as_leader = LinkOne(w, &newest_writer_);
  if (linked_as_leader) {
    SetState(w, STATE_GROUP_LEADER);
  }

  TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w);

  if (!linked_as_leader) {
    /**
     * Wait util:
     * 1) An existing leader pick us as the new leader when it finishes
     * 2) An existing leader pick us as its follewer and
     * 2.1) finishes the memtable writes on our behalf
     * 2.2) Or tell us to finish the memtable writes in pralallel
     * 3) (pipelined write) An existing leader pick us as its follower and
     *    finish book-keeping and WAL write for us, enqueue us as pending
     *    memtable writer, and
     * 3.1) we become memtable writer group leader, or
     * 3.2) an existing memtable writer group leader tell us to finish memtable
     *      writes in parallel.
     */
    AwaitState(w, STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER |
                      STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
               &ctx);
    TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w);
  }
}

bool WriteThread::LinkOne(Writer* w, std::atomic<Writer*>* newest_writer) {
  assert(newest_writer != nullptr);
  assert(w->state == STATE_INIT);
  Writer* writers = newest_writer->load(std::memory_order_relaxed);
  while (true) {
    w->link_older = writers;
    if (newest_writer->compare_exchange_weak(writers, w)) {
      return (writers == nullptr);
    }
  }
}

newest_writer_是std::atomic<Writer*>原子类型,在LinkOne中以无锁(lock-free)形式读写该变量。
LinkOne中使用while loop包着std::atomic的compare_exchange_weak,默认最严格的内存序memory_order_seq_cst,这是C++11中非常高效的无锁控制(best practice),无需任何锁,最终返回当前writer是否成为leader。

分拆来看写入的具体实现,先来看当前线程写入为STATE_PARALLEL_FOLLOWER时:

Status DBImpl::WriteImpl(const WriteOptions& write_options,
                         WriteBatch* my_batch, WriteCallback* callback,
                         uint64_t* log_used, uint64_t log_ref,
                         bool disable_memtable) {
  if (my_batch == nullptr) {
    return Status::Corruption("Batch is nullptr!");
  }

  Status status;

  PERF_TIMER_GUARD(write_pre_and_post_process_time);
  WriteThread::Writer w;
  w.batch = my_batch;
  w.sync = write_options.sync;
  w.disableWAL = write_options.disableWAL;
  w.disable_memtable = disable_memtable;
  w.in_batch_group = false;
  w.callback = callback;
  w.log_ref = log_ref;

  if (!write_options.disableWAL) {
    RecordTick(stats_, WRITE_WITH_WAL);
  }

  StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);

  write_thread_.JoinBatchGroup(&w);
  if (w.state == WriteThread::STATE_PARALLEL_FOLLOWER) {
    // we are a non-leader in a parallel group
    PERF_TIMER_GUARD(write_memtable_time);

    if (log_used != nullptr) {
      *log_used = w.log_used;
    }

    if (w.ShouldWriteToMemtable()) {
      ColumnFamilyMemTablesImpl column_family_memtables(
          versions_->GetColumnFamilySet());
      WriteBatchInternal::SetSequence(w.batch, w.sequence);
      w.status = WriteBatchInternal::InsertInto(
          &w, &column_family_memtables, &flush_scheduler_,
          write_options.ignore_missing_column_families, 0 /*log_number*/, this,
          true /*concurrent_memtable_writes*/);
    }

    if (write_thread_.CompleteParallelWorker(&w)) {
      // we're responsible for early exit
      auto last_sequence = w.parallel_group->last_sequence;
      versions_->SetLastSequence(last_sequence);
      write_thread_.EarlyExitParallelGroup(&w);
    }
    assert(w.state == WriteThread::STATE_COMPLETED);
    // STATE_COMPLETED conditional below handles exit

    status = w.FinalStatus();
  }
  if (w.state == WriteThread::STATE_COMPLETED) {
    if (log_used != nullptr) {
      *log_used = w.log_used;
    }
    // write is complete and leader has updated sequence
    RecordTick(stats_, WRITE_DONE_BY_OTHER);
    return w.FinalStatus();
  }
}

bool WriteThread::CompleteParallelWorker(Writer* w) {
  static AdaptationContext ctx("CompleteParallelWorker");

  auto* pg = w->parallel_group;
  if (!w->status.ok()) {
    std::lock_guard guard(pg->leader->StateMutex());
    pg->status = w->status;
  }

  auto leader = pg->leader;
  auto early_exit_allowed = pg->early_exit_allowed;

  if (pg->running.load(std::memory_order_acquire) > 1 && pg->running-- > 1) {
    // we're not the last one
    AwaitState(w, STATE_COMPLETED, &ctx);

    // Caller only needs to perform exit duties if early exit doesn't
    // apply and this is the leader.  Can't touch pg here.  Whoever set
    // our state to STATE_COMPLETED copied pg->status to w.status for us.
    return w == leader && !(early_exit_allowed && w->status.ok());
  }
  // else we're the last parallel worker

  if (w == leader || (early_exit_allowed && pg->status.ok())) {
    // this thread should perform exit duties
    w->status = pg->status;
    return true;
  } else {
    // We're the last parallel follower but early commit is not
    // applicable.  Wake up the leader and then wait for it to exit.
    assert(w->state == STATE_PARALLEL_FOLLOWER);
    SetState(leader, STATE_COMPLETED);
    AwaitState(w, STATE_COMPLETED, &ctx);
    return false;
  }
}

调用JoinBatchGroup若当前线程Writer成为STATE_PARALLEL_FOLLOWER,借助MemTableInserter将本次WriteBatch并发的写入ColumnFamilyMemTablesImpl对应的MemTable而无需锁,该MemTable默认是由ColumnFamilyOptions继承自AdvancedColumnFamilyOptions中的memtable_factory选项指定为SkipListFactory,而SkipListFactory的CreateMemTableRep创建SkipListRep,SkipListRep内部使用了InlineSkipList,这点对于了解当前MemTable支持的并发写入模式至关重要。

当前线程写入为STATE_GROUP_LEADER时:

 // else we are the leader of the write batch group
  assert(w.state == WriteThread::STATE_GROUP_LEADER);

  WriteContext context;
  mutex_.Lock();

  if (!write_options.disableWAL) {
    default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_WITH_WAL, 1);
  }

  RecordTick(stats_, WRITE_DONE_BY_SELF);
  default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1);

  // Once reaches this point, the current writer "w" will try to do its write
  // job.  It may also pick up some of the remaining writers in the "writers_"
  // when it finds suitable, and finish them in the same write batch.
  // This is how a write job could be done by the other writer.
  assert(!single_column_family_mode_ ||
         versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1);

  if (UNLIKELY(!single_column_family_mode_ &&
               total_log_size_ > GetMaxTotalWalSize())) {
    MaybeFlushColumnFamilies();
  }
  if (UNLIKELY(write_buffer_manager_->ShouldFlush())) {
    // Before a new memtable is added in SwitchMemtable(),
    // write_buffer_manager_->ShouldFlush() will keep returning true. If another
    // thread is writing to another DB with the same write buffer, they may also
    // be flushed. We may end up with flushing much more DBs than needed. It's
    // suboptimal but still correct.
    Log(InfoLogLevel::INFO_LEVEL, immutable_db_options_.info_log,
        "Flushing column family with largest mem table size. Write buffer is "
        "using %" PRIu64 " bytes out of a total of %" PRIu64 ".",
        write_buffer_manager_->memory_usage(),
        write_buffer_manager_->buffer_size());
    // no need to refcount because drop is happening in write thread, so can't
    // happen while we're in the write thread
    ColumnFamilyData* largest_cfd = nullptr;
    size_t largest_cfd_size = 0;

    for (auto cfd : *versions_->GetColumnFamilySet()) {
      if (cfd->IsDropped()) {
        continue;
      }
      if (!cfd->mem()->IsEmpty()) {
        // We only consider active mem table, hoping immutable memtable is
        // already in the process of flushing.
        size_t cfd_size = cfd->mem()->ApproximateMemoryUsage();
        if (largest_cfd == nullptr || cfd_size > largest_cfd_size) {
          largest_cfd = cfd;
          largest_cfd_size = cfd_size;
        }
      }
    }
    if (largest_cfd != nullptr) {
      status = SwitchMemtable(largest_cfd, &context);
      if (status.ok()) {
        largest_cfd->imm()->FlushRequested();
        SchedulePendingFlush(largest_cfd);
        MaybeScheduleFlushOrCompaction();
      }
    }
  }

  if (UNLIKELY(status.ok() && !bg_error_.ok())) {
    status = bg_error_;
  }

  if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) {
    status = ScheduleFlushes(&context);
  }

  if (UNLIKELY(status.ok() && (write_controller_.IsStopped() ||
                               write_controller_.NeedsDelay()))) {
    PERF_TIMER_STOP(write_pre_and_post_process_time);
    PERF_TIMER_GUARD(write_delay_time);
    // We don't know size of curent batch so that we always use the size
    // for previous one. It might create a fairness issue that expiration
    // might happen for smaller writes but larger writes can go through.
    // Can optimize it if it is an issue.
    status = DelayWrite(last_batch_group_size_, write_options);
    PERF_TIMER_START(write_pre_and_post_process_time);
  }

Status DBImpl::DelayWrite(uint64_t num_bytes,
                          const WriteOptions& write_options) {
  uint64_t time_delayed = 0;
  bool delayed = false;
  {
    StopWatch sw(env_, stats_, WRITE_STALL, &time_delayed);
    auto delay = write_controller_.GetDelay(env_, num_bytes);
    if (delay > 0) {
      if (write_options.no_slowdown) {
        return Status::Incomplete();
      }
      mutex_.Unlock();
      delayed = true;
      TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep");
      // hopefully we don't have to sleep more than 2 billion microseconds
      env_->SleepForMicroseconds(static_cast(delay));
      mutex_.Lock();
    }

    while (bg_error_.ok() && write_controller_.IsStopped()) {
      if (write_options.no_slowdown) {
        return Status::Incomplete();
      }
      delayed = true;
      TEST_SYNC_POINT("DBImpl::DelayWrite:Wait");
      bg_cv_.Wait();
    }
  }
  assert(!delayed || !write_options.no_slowdown);
  if (delayed) {
    default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_STALL_MICROS,
                                           time_delayed);
    RecordTick(stats_, STALL_MICROS, time_delayed);
  }

  return bg_error_;
}

// This is inside DB mutex, so we can't sleep and need to minimize
// frequency to get time.
// If it turns out to be a performance issue, we can redesign the thread
// synchronization model here.
// The function trust caller will sleep micros returned.
uint64_t WriteController::GetDelay(Env* env, uint64_t num_bytes) {
  if (total_stopped_ > 0) {
    return 0;
  }
  if (total_delayed_ == 0) {
    return 0;
  }

  const uint64_t kMicrosPerSecond = 1000000;
  const uint64_t kRefillInterval = 1024U;

  if (bytes_left_ >= num_bytes) {
    bytes_left_ -= num_bytes;
    return 0;
  }
  // The frequency to get time inside DB mutex is less than one per refill
  // interval.
  auto time_now = NowMicrosMonotonic(env);

  uint64_t sleep_debt = 0;
  uint64_t time_since_last_refill = 0;
  if (last_refill_time_ != 0) {
    if (last_refill_time_ > time_now) {
      sleep_debt = last_refill_time_ - time_now;
    } else {
      time_since_last_refill = time_now - last_refill_time_;
      bytes_left_ +=
          static_cast(static_cast(time_since_last_refill) /
                                kMicrosPerSecond * delayed_write_rate_);
      if (time_since_last_refill >= kRefillInterval &&
          bytes_left_ > num_bytes) {
        // If refill interval already passed and we have enough bytes
        // return without extra sleeping.
        last_refill_time_ = time_now;
        bytes_left_ -= num_bytes;
        return 0;
      }
    }
  }

  uint64_t single_refill_amount =
      delayed_write_rate_ * kRefillInterval / kMicrosPerSecond;
  if (bytes_left_ + single_refill_amount >= num_bytes) {
    // Wait until a refill interval
    // Never trigger expire for less than one refill interval to avoid to get
    // time.
    bytes_left_ = bytes_left_ + single_refill_amount - num_bytes;
    last_refill_time_ = time_now + kRefillInterval;
    return kRefillInterval + sleep_debt;
  }

  // Need to refill more than one interval. Need to sleep longer. Check
  // whether expiration will hit

  // Sleep just until `num_bytes` is allowed.
  uint64_t sleep_amount =
      static_cast(num_bytes /
                            static_cast(delayed_write_rate_) *
                            kMicrosPerSecond) +
      sleep_debt;
  last_refill_time_ = time_now + sleep_amount;
  return sleep_amount;
}

作为STATE_GROUP_LEADER先加锁, 由write_buffer_manager_(DBOptions指定全局总控使用,对应初始化由ColumnFamilyOptions中的write_buffer_size指定,默认值为64MB)检查所有DB的全部存活的memtable是否达到某个上限值,如果需要flush,会选择当前DB下memtable使用最多的columnfamily进行SwitchMemtable,判断是否需要创建新log和是否复用已有log文件,创建新memable并切换,再提交flush并做compaction。
上一步可能需要做flush或compaction,继而由write_controller_检查是否需要延迟写,这块rocksdb进行的优化相当用心。
来看DelayWrite,由于DBOptions中delayed_write_rate默认值为16MB/s(限制的最大写速率),以last_batch_group_size_(上一次写入大小)来预测当前线程暂停执行多少微秒,为精细控制等待的频次,以kRefillInterval(1024,大概1毫秒)为间隔单位调整等待时间,再检查是否需要激活条件变量进行睡眠阻塞出让CPU。

下面看当前write线程作为leader,本次写入应用到WAL和写MemTable:

  uint64_t last_sequence = versions_->LastSequence();
  WriteThread::Writer* last_writer = &w;
  autovector<WriteThread::Writer*> write_group;
  bool need_log_sync = !write_options.disableWAL && write_options.sync;
  bool need_log_dir_sync = need_log_sync && !log_dir_synced_;

  bool logs_getting_synced = false;
  if (status.ok()) {
    if (need_log_sync) {
      while (logs_.front().getting_synced) {
        log_sync_cv_.Wait();
      }
      for (auto& log : logs_) {
        assert(!log.getting_synced);
        log.getting_synced = true;
      }
      logs_getting_synced = true;
    }

    // Add to log and apply to memtable.  We can release the lock
    // during this phase since &w is currently responsible for logging
    // and protects against concurrent loggers and concurrent writes
    // into memtables
  }
  log::Writer* cur_log_writer = logs_.back().writer;

  mutex_.Unlock();

在本次写WAL之前,且作为leader仍持有锁期间,当前写入未禁用写WAL和文件及目录同步的情况下,检查所有的WAL文件并打完成同步标记,并得到本次需要写入的WAL日志writer对象,下面可以解锁进行真正的写入操作了。

 // At this point the mutex is unlocked

  bool exit_completed_early = false;
  last_batch_group_size_ =
      write_thread_.EnterAsBatchGroupLeader(&w, &last_writer, &write_group);

  if (status.ok()) {
    // Rules for when we can update the memtable concurrently
    // 1. supported by memtable
    // 2. Puts are not okay if inplace_update_support
    // 3. Deletes or SingleDeletes are not okay if filtering deletes
    //    (controlled by both batch and memtable setting)
    // 4. Merges are not okay
    //
    // Rules 1..3 are enforced by checking the options
    // during startup (CheckConcurrentWritesSupported), so if
    // options.allow_concurrent_memtable_write is true then they can be
    // assumed to be true.  Rule 4 is checked for each batch.  We could
    // relax rules 2 and 3 if we could prevent write batches from referring
    // more than once to a particular key.
    bool parallel = immutable_db_options_.allow_concurrent_memtable_write &&
                    write_group.size() > 1;
    int total_count = 0;
    uint64_t total_byte_size = 0;
    for (auto writer : write_group) {
      if (writer->CheckCallback(this)) {
        if (writer->ShouldWriteToMemtable()) {
          total_count += WriteBatchInternal::Count(writer->batch);
          parallel = parallel && !writer->batch->HasMerge();
        }

        if (writer->ShouldWriteToWAL()) {
          total_byte_size = WriteBatchInternal::AppendedByteSize(
              total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
        }
      }
    }

    const SequenceNumber current_sequence = last_sequence + 1;
    last_sequence += total_count;

    // Record statistics
    RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
    RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
    MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size);
    PERF_TIMER_STOP(write_pre_and_post_process_time);

    if (write_options.disableWAL) {
      has_unpersisted_data_.store(true, std::memory_order_relaxed);
    }

    uint64_t log_size = 0;
    if (!write_options.disableWAL) {
      PERF_TIMER_GUARD(write_wal_time);

      WriteBatch* merged_batch = nullptr;
      if (write_group.size() == 1 && write_group[0]->ShouldWriteToWAL() &&
          write_group[0]->batch->GetWalTerminationPoint().is_cleared()) {
        // we simply write the first WriteBatch to WAL if the group only
        // contains one batch, that batch should be written to the WAL,
        // and the batch is not wanting to be truncated
        merged_batch = write_group[0]->batch;
        write_group[0]->log_used = logfile_number_;
      } else {
        // WAL needs all of the batches flattened into a single batch.
        // We could avoid copying here with an iov-like AddRecord
        // interface
        merged_batch = &tmp_batch_;
        for (auto writer : write_group) {
          if (writer->ShouldWriteToWAL()) {
            WriteBatchInternal::Append(merged_batch, writer->batch,
                                       /*WAL_only*/ true);
          }
          writer->log_used = logfile_number_;
        }
      }

      if (log_used != nullptr) {
        *log_used = logfile_number_;
      }

      WriteBatchInternal::SetSequence(merged_batch, current_sequence);

      Slice log_entry = WriteBatchInternal::Contents(merged_batch);
      status = cur_log_writer->AddRecord(log_entry);
      total_log_size_ += log_entry.size();
      alive_log_files_.back().AddSize(log_entry.size());
      log_empty_ = false;
      log_size = log_entry.size();
      RecordTick(stats_, WAL_FILE_BYTES, log_size);
      if (status.ok() && need_log_sync) {
        RecordTick(stats_, WAL_FILE_SYNCED);
        StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS);
        // It's safe to access logs_ with unlocked mutex_ here because:
        //  - we've set getting_synced=true for all logs,
        //    so other threads won't pop from logs_ while we're here,
        //  - only writer thread can push to logs_, and we're in
        //    writer thread, so no one will push to logs_,
        //  - as long as other threads don't modify it, it's safe to read
        //    from std::deque from multiple threads concurrently.
        for (auto& log : logs_) {
          status = log.writer->file()->Sync(immutable_db_options_.use_fsync);
          if (!status.ok()) {
            break;
          }
        }
        if (status.ok() && need_log_dir_sync) {
          // We only sync WAL directory the first time WAL syncing is
          // requested, so that in case users never turn on WAL sync,
          // we can avoid the disk I/O in the write code path.
          status = directories_.GetWalDir()->Fsync();
        }
      }

      if (merged_batch == &tmp_batch_) {
        tmp_batch_.Clear();
      }
    }
    if (status.ok()) {
      PERF_TIMER_GUARD(write_memtable_time);

      {
        // Update stats while we are an exclusive group leader, so we know
        // that nobody else can be writing to these particular stats.
        // We're optimistic, updating the stats before we successfully
        // commit.  That lets us release our leader status early in
        // some cases.
        auto stats = default_cf_internal_stats_;
        stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size);
        stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count);
        if (!write_options.disableWAL) {
          if (write_options.sync) {
            stats->AddDBStats(InternalStats::WAL_FILE_SYNCED, 1);
          }
          stats->AddDBStats(InternalStats::WAL_FILE_BYTES, log_size);
        }
        uint64_t for_other = write_group.size() - 1;
        if (for_other > 0) {
          stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, for_other);
          if (!write_options.disableWAL) {
            stats->AddDBStats(InternalStats::WRITE_WITH_WAL, for_other);
          }
        }
      }

      if (!parallel) {
        status = WriteBatchInternal::InsertInto(
            write_group, current_sequence, column_family_memtables_.get(),
            &flush_scheduler_, write_options.ignore_missing_column_families,
            0 /*log_number*/, this);

        if (status.ok()) {
          // There were no write failures. Set leader's status
          // in case the write callback returned a non-ok status.
          status = w.FinalStatus();
        }

      } else {
        WriteThread::ParallelGroup pg;
        pg.leader = &w;
        pg.last_writer = last_writer;
        pg.last_sequence = last_sequence;
        pg.early_exit_allowed = !need_log_sync;
        pg.running.store(static_cast(write_group.size()),
                         std::memory_order_relaxed);
        write_thread_.LaunchParallelFollowers(&pg, current_sequence);

        if (w.ShouldWriteToMemtable()) {
          // do leader write
          ColumnFamilyMemTablesImpl column_family_memtables(
              versions_->GetColumnFamilySet());
          assert(w.sequence == current_sequence);
          WriteBatchInternal::SetSequence(w.batch, w.sequence);
          w.status = WriteBatchInternal::InsertInto(
              &w, &column_family_memtables, &flush_scheduler_,
              write_options.ignore_missing_column_families, 0 /*log_number*/,
              this, true /*concurrent_memtable_writes*/);
        }

        // CompleteParallelWorker returns true if this thread should
        // handle exit, false means somebody else did
        exit_completed_early = !write_thread_.CompleteParallelWorker(&w);
        status = w.FinalStatus();
      }

      if (!exit_completed_early && w.status.ok()) {
        versions_->SetLastSequence(last_sequence);
        if (!need_log_sync) {
          write_thread_.ExitAsBatchGroupLeader(&w, last_writer, w.status);
          exit_completed_early = true;
        }
      }

      // A non-OK status here indicates that the state implied by the
      // WAL has diverged from the in-memory state.  This could be
      // because of a corrupt write_batch (very bad), or because the
      // client specified an invalid column family and didn't specify
      // ignore_missing_column_families.
      //
      // Is setting bg_error_ enough here?  This will at least stop
      // compaction and fail any further writes.
      if (!status.ok() && bg_error_.ok() && !w.CallbackFailed()) {
        bg_error_ = status;
      }
    }
  }
  PERF_TIMER_START(write_pre_and_post_process_time);

  if (immutable_db_options_.paranoid_checks && !status.ok() &&
      !w.CallbackFailed() && !status.IsBusy() && !status.IsIncomplete()) {
    mutex_.Lock();
    if (bg_error_.ok()) {
      bg_error_ = status;  // stop compaction & fail any further writes
    }
    mutex_.Unlock();
  }

  if (logs_getting_synced) {
    mutex_.Lock();
    MarkLogsSynced(logfile_number_, need_log_dir_sync, status);
    mutex_.Unlock();
  }

  if (!exit_completed_early) {
    write_thread_.ExitAsBatchGroupLeader(&w, last_writer, w.status);
  }

  return status;

调用EnterAsBatchGroupLeader,计算当前线程作为leader及其所有follower的本次总计写入大小(即DelayWrite预测暂停时间的参照量),且follower的写入按照先来后到的顺序并入本次batch group。

发表评论

电子邮件地址不会被公开。 必填项已用*标注