ceph源码分析之读写操作流程(2)上一篇介绍了ceph存储在上两层的消息逻辑,这一篇主要介绍一下读写操作在底两层的流程。
下图是上一篇消息流程的一个总结。
上在ceph中,读写操作由于分布式存储的原因,故走了不同流程。
对于读操作而言:1.客户端直接计算出存储数据所属于的主osd,直接给主osd 上发送消息。
2.主osd收到消息后,可以调用Filestore直接读取处在底层文件系统中的主pg里面的内容然后返回给客户端。
具体调用函数在ReplicatedPG::do_osd_ops中实现。
读操作代码流程如图:如我们之前说的,当确定读操作为主osd的消息时(CEPH_MSG_OSD_OP类型),会调用到ReplicatePG::do_osd_op函数,该函数对类型做进一步判断,当发现为读类型(CEPH_OSD_OP_READ)时,会调用FileStore中的函数对磁盘上数据进行读。
[cpp] view plain copy int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) { ……switch (op.op) { …… caseCEPH_OSD_OP_READ: ++ctx->num_read; { // read into a buffer bufferlistbl; int r = osd->store->read(coll, soid, op.extent.offset, op.extent.length, bl); // 调用FileStore::read从底层文件系统读取……} caseCEPH_OSD_OP_WRITE:++ctx->num_write; { ……//写操作只是做准备工作,并不实际的写} ……} } FileStore::read 函数是底层具体的实现,会通过调用系统函数如::open,::pread,::close等函数来完成具体的操作。
[cpp] view plain copy int FileStore::read( coll_t cid, const ghobject_t& oid, uint64_t offset, size_t len, bufferlist& bl, bool allow_eio) { ……int r = lfn_open(cid, oid, false, &fd); ……got = safe_pread(**fd, bptr.c_str(), len, offset);//FileStore::safe_pread中调用了::pread ……lfn_close(fd); ……} 而对于写操作而言,由于要保证数据写入的同步性就会复杂很多:1.首先客户端会将数据发送给主osd,2.主osd同样要先进行写操作预处理,完成后它要发送写消息给其他的从osd,让他们对副本pg进行更改,3.从osd通过FileJournal完成写操作到Journal中后发送消息告诉主osd说完成,进入54.当主osd收到所有的从osd完成写操作的消息后,会通过FileJournal完成自身的写操作到Journal中。
完成后会通知客户端,已经完成了写操作。
5.主osd,从osd的线程开始工作调用Filestore将Journal中的数据写入到底层文件系统中。
在介绍写操作的流程前,需要先介绍一下ceph中的callback函数。
Context类定义在src/include文件中,该类是一个回调函数类的抽象类,继承它的类只要在子类实现它的finish函数,在finish函数调用自己需要回调的函数,就可以完成回调。
[cpp] view plain copy class Context { Context(const Context& other); const Context&operator=(const Context& other); protected:virtual void finish(int r) = 0; public: Context() {} virtual ~Context() {} // we want a virtual destructor!!! virtual void complete(int r) { finish(r); delete this; } }; Finisher类是在src/common中定义的一个专门查看操作是否结束的一个类。
在这个类里面拥有一个线程finisher_thread和一个类型为Context指针的队列finisher_queue。
当一个操作线程完成自己的操作后,会将Context类型对象送入队列。
此时finisher_thread线程循环监视着自己的finisher_queue队列,当发现了有新进入的Context时,会调用这个Context::complete函数,这个函数则会调用到Context子类自己实现的finish函数。
来处理操作完成后的后续工作。
[cpp] view plain copy class Finisher { CephContext*cct; …… vector<Context*>finisher_queue; …… void*finisher_thread_entry(); struct FinisherThread : public Thread { Finisher *fin;FinisherThread(Finisher *f) : fin(f) {} void* entry() { return (void*)fin->finisher_thread_entry(); } } finisher_thread; …… } void*Finisher::finisher_thread_entry() { ……while(!finisher_stop){ while(!finisher_queue.empty( )){ ……vector<Context*> lsls.swap(finisher_queue); for(vector<Context*>::iterator p = ls.begin();p != ls.end(); ++p) { if (*p) { //这里面调用Context子类实现的finish函数(*p)->complete(0); } } } } } 在写操作中涉及了多个线程和消息队列的协同工作,需要注意的是一个类拥有一个Finisher 成员时,以为着它同时获得了一个队列和一个执行线程。
OSD中处理读写操作是线程池和消息队列(有很多,其他暂时不讨论):[cpp] view plain copy ThreadPool op_tp;ThreadPool::WorkQueueVal<pair<PGRef,OpRequestRef& gt;, PGRef> &op_wq;FileJournal中拥有的线程和消息队列:[cpp] view plain copy Write write_thread;deque<write_item> writeq;其父类Journal中拥有线程和消息队列(引用自之前说的JournalingObjectStore类中):[cpp] view plain copy Finisher finisher_thread; FileStore中拥有的线程和消息队列:[cpp] view plain copy ThreadPool op_tp; OpWQop_wq;//Filestore中实现,继承自ThreadPool::WorkQueue<OpSequencer> Finisher ondisk_finisher; Finisher op_finisher; 前一章说过在前两层,OSD根据不同的消息类型,选择了主OSD处理和从OSD的处理,以下介绍的写流程,就是在收到具体写操作以后,本地OSD开始的工作。
写的逻辑流程图如图:从图中我们可以看到写操作分为以下几步:1.OSD::op_tp线程从OSD::op_wq中拿出来操作如本文开始的图上描述,具体代码流是ReplicatePG::apply_repop中创建回调类C_OSD_OpCommit和C_OSD_OpApplied FileStore::queue_transactions中创建了回调类C_JournaledAhead2.FileJournal::write_thread线程从FileJournal::writeq中拿出来操作,主要就是写数据到具体的journal中,具体代码流:3.Journal::Finisher.finisher_thread线程从Journal::Finisher.finish_queue中拿出来操作,通过调用C_JournalAhead留下的回调函数FileStore:_journaled_ahead,该线程开始工作两件事:首先入底层FileStore::op_wq通知开始写,再入FileStore::ondisk_finisher.finisher_queue通知可以返回。
具体代码流:4.FileStore::ondisk_finisher.finisher_thread 线程从FileStore::ondisk_finisher.finisher_queue中拿出来操作,通过调用C_OSD_OpCommit留下来的回调函数ReplicatePG::op_commit,通知客户端写操作成功5.FileStore::op_tp线程池从FileStore::op_wq中拿出操作(此处的OP_WQ继承了父类ThreadPool::WorkQueue重写了_process和_process_finish等函数,所以不同于OSD::op_wq,它有自己的工作流程),首先调用FileStore::_do_op,完成后调用FileStore::_finish_op。
6.FileStore::op_finisher.finisher_thread线程从FileStore::op_finisher.finisher_queue中拿出来操作,通过调用C_OSD_OpApplied留下来的回调函数ReplicatePG::op_applied,通知数据可读。