当前位置: 首页 > news >正文

贵阳好的网站建设2000元代理微信朋友圈广告

贵阳好的网站建设,2000元代理微信朋友圈广告,网站开发 公司,区网站开发语言项目第十一弹:客户端设计与消费者管理模块 一、客户端设计1.模块大框架2.RabbitMQ弱化客户端提高灵活性3.模块划分1.消费者描述模块1.为何要有消费者模块?2.需要实现额外的管理模块吗?3.为何服务器要实现额外的管理模块呢? 2.异步…

项目第十一弹:客户端设计与消费者管理模块

  • 一、客户端设计
    • 1.模块大框架
    • 2.RabbitMQ弱化客户端提高灵活性
    • 3.模块划分
      • 1.消费者描述模块
        • 1.为何要有消费者模块?
        • 2.需要实现额外的管理模块吗?
        • 3.为何服务器要实现额外的管理模块呢?
      • 2.异步工作线程模块
        • 1.EventLoopThread
        • 2.异步工作线程池
  • 二、消费者模块实现
  • 三、信道管理模块设计与实现
    • 1.BUG?如何解决
    • 2.信道模块的实现
      • 1.对用户提供的服务的实现
      • 2.对连接模块提供的接口
      • 3.为何客户端的Channel也要有创建信道这个函数?
      • 4.完整代码
    • 3.信道管理模块的实现
  • 四、异步工作线程模块
  • 五、连接模块
    • 0.连接模块的细节点
    • 1.信道池
    • 2.实现信道池,资源隔离会受到影响吗?
    • 3.网络搭建模块的实现
    • 4.打开关闭信道的实现
    • 5.两个响应回调函数的实现
    • 6.完整代码

一、客户端设计

1.模块大框架

客户端其实就很简单了
只需要一个网络模块和业务模块

网络模块负责跟服务器进行通信
业务模块负责构建并发送网络请求,接收并处理网络响应,接收消息进行消费或者向服务器发送消息
在这里插入图片描述

2.RabbitMQ弱化客户端提高灵活性

在这里插入图片描述

3.模块划分

  1. 连接模块(无需管理,因为一个连接就是一个客户端)
  2. 信道管理模块(通过向服务器发送请求来给用户提供相应服务)

那我们还需要什么模块吗?

1.消费者描述模块

1.为何要有消费者模块?

我们之前说过,服务器的消费者描述模块当中的消费处理回调函数只负责:

向该消费者发送消费响应

因为对应真正的消费处理回调函数必须在消费者客户端进行执行
所以客户端也必须要有消费处理回调函数来

真正处理对应的消息,并且确认该消息

2.需要实现额外的管理模块吗?

只不过在客户端这里,因为我们规定一个消费者只能关联一个信道,所以消费者和信道天生就已经绑定到了一起,因此我们无需实现消费者管理模块
只需要实现一个消费者描述结构体即可

3.为何服务器要实现额外的管理模块呢?

服务器也是一个消费者只能关联一个信道啊,为何他就需要实现呢?

因为服务器当中我们要根据队列来负载均衡式选择一个消费者进行消息的推送,因此在服务器模块,消费者必须要跟队列建立联系

所以我们选择将消费者以队列为单位进行管理,而不是信道
【信道的话,在将队列中的消息推送给消费者时就需要遍历所有信道,查找其关联的消费者是否是我这个队列,那样的话效率非常低,而且代码极其不优雅】

而按照队列为单位进行管理,效率高,代码优雅,还能实现负载均衡

2.异步工作线程模块

1.EventLoopThread

在这里插入图片描述

2.异步工作线程池

为何我们需要异步工作线程池?

因为我们的消费处理回调函数是一个相对独立,解耦的函数
所以为了解放我们的信道服务执行流,将该函数打包扔到工作线程池当中去完成

将这二者结合起来组成一个新的模块,这个模块就是异步工作线程模块

二、消费者模块实现

其实跟服务器那里的消费者描述一样的:

using ConsumerCallback = std::function<void(const std::string &, const ns_proto::BasicProperities *, const std::string &)>;
struct Consumer
{using ptr = std::shared_ptr<Consumer>;Consumer() = default;Consumer(const std::string &tag, const ConsumerCallback &callback, const std::string &vhost_name, const std::string &qname, bool auto_ack): _consumer_tag(tag), _callback(callback), _vhost_name(vhost_name), _queue_name(qname), _auto_ack(auto_ack) {}std::string _consumer_tag;  // 消费者tag(唯一标识)ConsumerCallback _callback; // 消费者回调函数std::string _vhost_name;    // 消费者队列所在虚拟机名称std::string _queue_name;    // 消费者订阅的队列bool _auto_ack;             // 自动确认标志
};

三、信道管理模块设计与实现

1.BUG?如何解决

跟服务器的信道管理模块差不多,只不过它是组织请求,发送给服务器。
但是有一点需要注意:
因为我们实现的网络服务接口是非阻塞的,而我们用户执行的任务需要一种顺序性,此时就需要条件变量来控制任务执行的同步性

比如:创建队列、绑定队列
客户端执行完创建队列之后,其本质就只是向服务器发送了一个DeclareMsgQueueRequest而已,并未立刻创建该队列

而如果此时直接返回,那么用户就能执行下一步的绑定队列了
一旦BindRequest到达服务器更早,那么队列绑定必然失败,而队列创建却能够成功
此时就坑了,这就是BUG

所以需要条件变量

可是我们怎么确定我们的请求成功被服务器接收并执行了?
通过我们请求当中的req_id即可,它是请求的唯一标识,我们只需要搞一个unordered_map<req_id,Request>的哈希表即可

信道管理模块跟服务器的一样,都是增、删、查,互斥锁+哈希表

2.信道模块的实现

1.对用户提供的服务的实现

注意:信道创建时我们无需用户来提供信道ID,而只需要我们自己在内部创建即可,无需用户操心,这样可以提高用户体验

刚才说了,其实就是:

  1. 利用参数构建请求
  2. 发送请求
  3. 等待响应
  4. 返回结果

因此我们就可以写出,就是代码多一些,其实思路和步骤都是一样的

using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;using BasicCommonResponsePtr = std::shared_ptr<BasicCommonResponse>;
using BasicConsumeResponsePtr = std::shared_ptr<BasicConsumeResponse>;class Channel
{
public:Channel(const muduo::net::TcpConnectionPtr &conn, const ProtobufCodecPtr &codec): _conn(conn), _codec(codec), _channel_id(UUIDHelper::uuid()) {}void openChannel(){// 组织请求OpenChannelRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);// 发送请求_codec->send(_conn, req);// 等待响应BasicCommonResponsePtr resp = waitResponse(rid);}void closeChannel(){// 组织请求CloseChannelRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);// 发送请求_codec->send(_conn, req);// 等待响应BasicCommonResponsePtr resp = waitResponse(rid);}bool declareVirtualHost(const std::string &vhost_name, const std::string &dbfile, const std::string &basedir){// 组织请求DeclareVirtualHostRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);req.set_vhost_name(vhost_name);req.set_dbfile(dbfile);req.set_basedir(basedir);// 发送请求_codec->send(_conn, req);// 等待响应BasicCommonResponsePtr resp = waitResponse(rid);// 返回结果return resp->ok();}bool eraseVirtualHost(const std::string &vhost_name){EraseVirtualHostRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);req.set_vhost_name(vhost_name);// 发送请求_codec->send(_conn, req);BasicCommonResponsePtr resp = waitResponse(rid);return resp->ok();}bool declareExchange(const std::string &vhost_name, const std::string &exchange_name, const ExchangeType type,bool durable, bool auto_delete, google::protobuf::Map<std::string, std::string> args)// 直接现代赋值写法搞定他{DeclareExchangeRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);req.set_vhost_name(vhost_name);req.set_exchange_name(exchange_name);req.set_type(type);req.set_durable(durable);req.set_auto_delete(auto_delete);req.mutable_args()->swap(args);// 发送请求_codec->send(_conn, req);BasicCommonResponsePtr resp = waitResponse(rid);return resp->ok();}bool eraseExchange(const std::string &vhost_name, const std::string &exchange_name){EraseExchangeRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);req.set_vhost_name(vhost_name);req.set_exchange_name(exchange_name);// 发送请求_codec->send(_conn, req);BasicCommonResponsePtr resp = waitResponse(rid);return resp->ok();}bool declareMsgQueue(const std::string &vhost_name, const std::string &queue_name, bool durable, bool exclusive,bool auto_delete, google::protobuf::Map<std::string, std::string> args){DeclareMsgQueueRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);req.set_vhost_name(vhost_name);req.set_queue_name(queue_name);req.set_durable(durable);req.set_exclusive(exclusive);req.set_auto_delete(auto_delete);req.mutable_args()->swap(args);// 发送请求_codec->send(_conn, req);BasicCommonResponsePtr resp = waitResponse(rid);return resp->ok();}bool eraseMsgQueue(const std::string &vhost_name, const std::string &queue_name){EraseMsgQueueRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);req.set_vhost_name(vhost_name);req.set_queue_name(queue_name);// 发送请求_codec->send(_conn, req);BasicCommonResponsePtr resp = waitResponse(rid);return resp->ok();}bool bind(const std::string &vhost_name, const std::string &exchange_name, const std::string &queue_name, const std::string &binding_key){BindRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);req.set_vhost_name(vhost_name);req.set_exchange_name(exchange_name);req.set_queue_name(queue_name);req.set_binding_key(binding_key);// 发送请求_codec->send(_conn, req);BasicCommonResponsePtr resp = waitResponse(rid);return resp->ok();}bool unBind(const std::string &vhost_name, const std::string &exchange_name, const std::string &queue_name){UnbindRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);req.set_vhost_name(vhost_name);req.set_exchange_name(exchange_name);req.set_queue_name(queue_name);// 发送请求_codec->send(_conn, req);BasicCommonResponsePtr resp = waitResponse(rid);return resp->ok();}bool BasicConsume(const std::string &vhost_name, const std::string &consumer_tag, const std::string &queue_name,const ConsumerCallback &callback, bool auto_ack){if (_consumer.get() != nullptr){default_error("队列订阅失败,因为该信道已经关联消费者了,关联的消费者tag:%s ,该订阅失败的消费者tag:%s",_consumer->_consumer_tag.c_str(),consumer_tag.c_str());return false;}BasicConsumeRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);req.set_vhost_name(vhost_name);req.set_consumer_tag(consumer_tag);req.set_queue_name(queue_name);req.set_auto_ack(auto_ack);// 发送请求_codec->send(_conn, req);std::ostringstream oss;BasicCommonResponsePtr resp = waitResponse(rid);if (resp->ok()){_consumer = std::make_shared<Consumer>(consumer_tag, callback, vhost_name, queue_name, auto_ack);default_info("关联消费者成功: %s",consumer_tag.c_str());}else{default_info("关联消费者失败: %s",consumer_tag.c_str());}return resp->ok();}bool BasicCancel(){std::ostringstream oss;if (_consumer.get() == nullptr){default_error("取消订阅失败 ,因为该信道并未关联消费者");return false;}BasicCancelRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);req.set_vhost_name(_consumer->_vhost_name);req.set_consumer_tag(_consumer->_consumer_tag);req.set_queue_name(_consumer->_queue_name);// 发送请求_codec->send(_conn, req);BasicCommonResponsePtr resp = waitResponse(rid);if (resp->ok()){_consumer.reset();oss << "取消订阅成功 ,队列名:" << _consumer->_queue_name << "\n";}else{oss << "服务端取消订阅失败, 故消费者删除失败, 消费者tag: " << _consumer->_consumer_tag << "\n";}return resp->ok();}bool BasicPublish(const std::string &vhost_name, const std::string &exchange_name, const BasicProperities *bp, const std::string &body){BasicPublishRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);req.set_vhost_name(vhost_name);req.set_exchange_name(exchange_name);req.set_body(body);if (bp != nullptr){req.mutable_properities()->set_msg_id(bp->msg_id());req.mutable_properities()->set_mode(bp->mode());req.mutable_properities()->set_routing_key(bp->routing_key());}// 发送请求_codec->send(_conn, req);BasicCommonResponsePtr resp = waitResponse(rid);std::ostringstream oss;if (resp->ok()){default_info("发布消息成功 %s",body.c_str());}else{default_info("发布消息失败 %s",body.c_str());}return resp->ok();}bool BasicAck(const std::string &vhost_name, const std::string &queue_name, const std::string &msg_id){if (_consumer.get() == nullptr){default_error("确认消息失败 ,因为该信道并未关联消费者");return false;}BasicAckRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);req.set_vhost_name(vhost_name);req.set_queue_name(queue_name);req.set_msg_id(msg_id);// 发送请求_codec->send(_conn, req);BasicCommonResponsePtr resp = waitResponse(rid);return resp->ok();}private:BasicCommonResponsePtr waitResponse(const std::string &rid){std::unique_lock<std::mutex> ulock(_mutex);// 当lambda返回true时,才能出来; 注意:this指针只能传值捕捉// 因为lambda支持拷贝构造,因此lambda可以通过创建副本在该函数外部存活// 而如果引用捕捉this指针的话,那么在函数外部这个this指针就成为了野指针// 因此lambda无法捕捉this_cv.wait(ulock, [&rid, this]() -> bool{ return _resp_map.count(rid) > 0; });return _resp_map[rid];}muduo::net::TcpConnectionPtr _conn;ProtobufCodecPtr _codec;std::string _channel_id;Consumer::ptr _consumer; // 该信道关联的消费者std::mutex _mutex;std::condition_variable _cv;std::unordered_map<std::string, BasicCommonResponsePtr> _resp_map;
};

2.对连接模块提供的接口

  1. 因为只有连接模块才能监听响应服务器发来的BasicCommonResponse
    所以需要给连接模块提供一个放哈希表当中放响应的接口

  2. 当连接模块收到BasicConsumeResponse时,能够通过该resp找到对应的信道,不过无法拿到其内部的消费者的消费处理回调函数进行调用,因此需要信道模块提供一个能够调用其内部消费者的消费处理回调函数的接口

// 给连接模块用的接口
public:
std::string cid() const
{// 因为_channel_id在构造函数的初始化列表阶段初始化之后就再也不修改了,所以这里无需加锁,提高效率return _channel_id;
}// 连接收到基础响应之后,向hash表中添加响应
void putResponse(const BasicCommonResponsePtr &resp)
{{std::unique_lock<std::mutex> ulock(_mutex);_resp_map[resp->req_id()] = resp;}_cv.notify_all(); // 把所有阻塞的线程都唤醒,让他们看看自己能否成功继续往下运行
}// 连接收到消息推送之后,需要通过信道找到对应的消费者对象,通过回调函数进行消息处理
void consume(const BasicConsumeResponsePtr &resp)
{// 1.看该resp的信道是否相同if (resp->channel_id() != _channel_id){default_info("消息消费失败,因为resp的信道ID跟本信道ID不同:resp->channel_id():%s ,本信道ID:%s",resp->channel_id().c_str(),_channel_id.c_str());return;}// 2.看是否有消费者if (_consumer.get() == nullptr){default_info("消息消费失败,因为该信道没有消费者");return;}// 3.看该resp的消费者是否相同if (resp->consumer_tag() != _consumer->_consumer_tag){default_info("消息消费失败,因为resp的消费者tag跟本消费者tag不同:resp->channel_id():%s ,本信道ID:%s",resp->consumer_tag().c_str(),_consumer->_consumer_tag.c_str());return;}// 3.调用该消费者的消费处理回调函数_consumer->_callback(resp->consumer_tag(),resp->mutable_properities(),resp->body());
}

3.为何客户端的Channel也要有创建信道这个函数?

因为客户端的这些函数都是对服务器发送相应的请求而已
所以客户端的Channel不会受到自身的限制,也可以这么理解:

客户端的Channel的OpenChannel这个函数是创建服务器方的Channel

但是客户端的Connection的OpenChannel则是创建客户端方的Channel

4.完整代码

using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;using BasicCommonResponsePtr = std::shared_ptr<BasicCommonResponse>;
using BasicConsumeResponsePtr = std::shared_ptr<BasicConsumeResponse>;class Channel
{
public:using ptr=std::shared_ptr<Channel>;Channel(const muduo::net::TcpConnectionPtr &conn, const ProtobufCodecPtr &codec): _conn(conn), _codec(codec), _channel_id(UUIDHelper::uuid()) {}void openChannel(){// 组织请求OpenChannelRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);// 发送请求_codec->send(_conn, req);// 等待响应BasicCommonResponsePtr resp = waitResponse(rid);}void closeChannel(){// 组织请求CloseChannelRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);// 发送请求_codec->send(_conn, req);// 等待响应BasicCommonResponsePtr resp = waitResponse(rid);}bool declareVirtualHost(const std::string &vhost_name, const std::string &dbfile, const std::string &basedir){// 组织请求DeclareVirtualHostRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);req.set_vhost_name(vhost_name);req.set_dbfile(dbfile);req.set_basedir(basedir);// 发送请求_codec->send(_conn, req);// 等待响应BasicCommonResponsePtr resp = waitResponse(rid);// 返回结果return resp->ok();}bool eraseVirtualHost(const std::string &vhost_name){EraseVirtualHostRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);req.set_vhost_name(vhost_name);// 发送请求_codec->send(_conn, req);BasicCommonResponsePtr resp = waitResponse(rid);return resp->ok();}bool declareExchange(const std::string &vhost_name, const std::string &exchange_name, const ExchangeType type,bool durable, bool auto_delete, google::protobuf::Map<std::string, std::string> args)// 直接现代赋值写法搞定他{DeclareExchangeRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);req.set_vhost_name(vhost_name);req.set_exchange_name(exchange_name);req.set_type(type);req.set_durable(durable);req.set_auto_delete(auto_delete);req.mutable_args()->swap(args);// 发送请求_codec->send(_conn, req);BasicCommonResponsePtr resp = waitResponse(rid);return resp->ok();}bool eraseExchange(const std::string &vhost_name, const std::string &exchange_name){EraseExchangeRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);req.set_vhost_name(vhost_name);req.set_exchange_name(exchange_name);// 发送请求_codec->send(_conn, req);BasicCommonResponsePtr resp = waitResponse(rid);return resp->ok();}bool declareMsgQueue(const std::string &vhost_name, const std::string &queue_name, bool durable, bool exclusive,bool auto_delete, google::protobuf::Map<std::string, std::string> args){DeclareMsgQueueRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);req.set_vhost_name(vhost_name);req.set_queue_name(queue_name);req.set_durable(durable);req.set_exclusive(exclusive);req.set_auto_delete(auto_delete);req.mutable_args()->swap(args);// 发送请求_codec->send(_conn, req);BasicCommonResponsePtr resp = waitResponse(rid);return resp->ok();}bool eraseMsgQueue(const std::string &vhost_name, const std::string &queue_name){EraseMsgQueueRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);req.set_vhost_name(vhost_name);req.set_queue_name(queue_name);// 发送请求_codec->send(_conn, req);BasicCommonResponsePtr resp = waitResponse(rid);return resp->ok();}bool bind(const std::string &vhost_name, const std::string &exchange_name, const std::string &queue_name, const std::string &binding_key){BindRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);req.set_vhost_name(vhost_name);req.set_exchange_name(exchange_name);req.set_queue_name(queue_name);req.set_binding_key(binding_key);// 发送请求_codec->send(_conn, req);BasicCommonResponsePtr resp = waitResponse(rid);return resp->ok();}bool unBind(const std::string &vhost_name, const std::string &exchange_name, const std::string &queue_name){UnbindRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);req.set_vhost_name(vhost_name);req.set_exchange_name(exchange_name);req.set_queue_name(queue_name);// 发送请求_codec->send(_conn, req);BasicCommonResponsePtr resp = waitResponse(rid);return resp->ok();}bool BasicConsume(const std::string &vhost_name, const std::string &consumer_tag, const std::string &queue_name,const ConsumerCallback &callback, bool auto_ack){if (_consumer.get() != nullptr){default_error("队列订阅失败,因为该信道已经关联消费者了,关联的消费者tag:%s ,该订阅失败的消费者tag:%s",_consumer->_consumer_tag.c_str(),consumer_tag.c_str());}BasicConsumeRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);req.set_vhost_name(vhost_name);req.set_consumer_tag(consumer_tag);req.set_queue_name(queue_name);req.set_auto_ack(auto_ack);// 发送请求_codec->send(_conn, req);std::ostringstream oss;BasicCommonResponsePtr resp = waitResponse(rid);if (resp->ok()){_consumer = std::make_shared<Consumer>(consumer_tag, queue_name, callback, auto_ack);default_info("关联消费者成功: %s",consumer_tag.c_str());}else{default_info("关联消费者失败: %s",consumer_tag.c_str());}return resp->ok();}bool BasicCancel(){std::ostringstream oss;if (_consumer.get() == nullptr){default_error("取消订阅失败 ,因为该信道并未关联消费者");return false;}BasicCancelRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);req.set_vhost_name(_consumer->_vhost_name);req.set_consumer_tag(_consumer->_consumer_tag);req.set_queue_name(_consumer->_queue_name);// 发送请求_codec->send(_conn, req);BasicCommonResponsePtr resp = waitResponse(rid);if (resp->ok()){_consumer.reset();oss << "取消订阅成功 ,队列名:" << _consumer->_queue_name << "\n";}else{oss << "服务端取消订阅失败, 故消费者删除失败, 消费者tag: " << _consumer->_consumer_tag << "\n";}return resp->ok();}bool BasicPublish(const std::string &vhost_name, const std::string &exchange_name, const BasicProperities *bp, const std::string &body){BasicPublishRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);req.set_vhost_name(vhost_name);req.set_exchange_name(exchange_name);req.set_body(body);if (bp != nullptr){req.mutable_properities()->set_msg_id(bp->msg_id());req.mutable_properities()->set_mode(bp->mode());req.mutable_properities()->set_routing_key(bp->routing_key());}// 发送请求_codec->send(_conn, req);BasicCommonResponsePtr resp = waitResponse(rid);std::ostringstream oss;if (resp->ok()){default_info("发布消息成功 %s",body.c_str());}else{default_info("发布消息失败 %s",body.c_str());}return resp->ok();}bool BasicAck(const std::string &vhost_name, const std::string &queue_name, const std::string &msg_id){BasicAckRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);req.set_vhost_name(vhost_name);req.set_queue_name(queue_name);req.set_msg_id(msg_id);// 发送请求_codec->send(_conn, req);BasicCommonResponsePtr resp = waitResponse(rid);return resp->ok();}// 给连接模块用的接口
public:std::string cid() const{// 因为_channel_id在构造函数的初始化列表阶段初始化之后就再也不修改了,所以这里无需加锁,提高效率return _channel_id;}// 连接收到基础响应之后,向hash表中添加响应void putResponse(const BasicCommonResponsePtr &resp){{std::unique_lock<std::mutex> ulock(_mutex);_resp_map[resp->req_id()] = resp;}_cv.notify_all(); // 把所有阻塞的线程都唤醒,让他们看看自己能否成功继续往下运行}// 连接收到消息推送之后,需要通过信道找到对应的消费者对象,通过回调函数进行消息处理void consume(const BasicConsumeResponsePtr &resp){std::ostringstream oss;// 1.看该resp的信道是否相同if (resp->channel_id() != _channel_id){default_info("消息消费失败,因为resp的信道ID跟本信道ID不同:resp->channel_id():%s ,本信道ID:%s",resp->channel_id().c_str(),_channel_id.c_str());return;}// 2.看是否有消费者if (_consumer.get() == nullptr){default_info("消息消费失败,因为该信道没有消费者");return;}// 3.看该resp的消费者是否相同if (resp->consumer_tag() != _consumer->_consumer_tag){default_info("消息消费失败,因为resp的消费者tag跟本消费者tag不同:resp->channel_id():%s ,本信道ID:%s",resp->consumer_tag().c_str(),_consumer->_consumer_tag.c_str());return;}// 3.调用该消费者的消费处理回调函数_consumer->_callback(resp->consumer_tag(),resp->mutable_properities(),resp->body());}private:BasicCommonResponsePtr waitResponse(const std::string &rid){std::unique_lock<std::mutex> ulock(_mutex);// 当lambda返回true时,才能出来; 注意:this指针只能传值捕捉// 因为lambda支持拷贝构造,因此lambda可以通过创建副本在该函数外部存活// 而如果引用捕捉this指针的话,那么在函数外部这个this指针就成为了野指针// 因此lambda无法捕捉this_cv.wait(ulock, [&rid, this]() -> bool{ return _resp_map.count(rid) > 0; });return _resp_map[rid];}muduo::net::TcpConnectionPtr _conn;ProtobufCodecPtr _codec;std::string _channel_id;Consumer::ptr _consumer; // 该信道关联的消费者std::mutex _mutex;std::condition_variable _cv;std::unordered_map<std::string, BasicCommonResponsePtr> _resp_map;
};

3.信道管理模块的实现

下面就是增、删、查
哈希表+互斥锁

class ChannelManager
{
public:using ptr = std::shared_ptr<ChannelManager>;Channel::ptr createChannel(const muduo::net::TcpConnectionPtr &conn, const ProtobufCodecPtr &codec){std::unique_lock<std::mutex> ulock(_mutex);Channel::ptr cp = std::make_shared<Channel>(conn, codec);_channel_map.insert(std::make_pair(cp->cid(), cp));return cp;}void removeChannel(const std::string &channel_id){std::unique_lock<std::mutex> ulock(_mutex);_channel_map.erase(channel_id);}Channel::ptr getChannel(const std::string &channel_id){std::unique_lock<std::mutex> ulock(_mutex);auto iter = _channel_map.find(channel_id);if (iter == _channel_map.end())return Channel::ptr();return iter->second;}private:std::mutex _mutex;std::unordered_map<std::string, Channel::ptr> _channel_map;
};

四、异步工作线程模块

#pragma once#include "../mqhelper/async_pool.hpp"
#include "muduo/net/EventLoopThread.h"
#include <memory>
/*
异步工作线程模块:1. muduo库中客户端连接的异步循环线程EventLoopThread2. 一个是当收到消息后进行异步处理的工作线程池这两项都不是以连接为单元进行创建的,而是创建后,可以用以多个连接中,因此单独进行封装
*/
namespace ns_mq
{struct AsyncWorker{using ptr=std::shared_ptr<AsyncWorker>;ns_helper::threadpool _pool;muduo::net::EventLoopThread _loopthread;};
}

五、连接模块

0.连接模块的细节点

注意:我们的连接模块并不提供查询来获取信道的操作
只支持在创建信道时获取到的新增信道

这是为了保证信道资源的隔离性,就像是学校食堂当中的号码牌只有在申请的时候才能获取,而不支持查询获取一样
就是为了保证人和号码牌的一一对应

这里也是为了保证线程和号码牌的一一对应
只不过这里一个线程可以领多个号码牌,而一个号码牌只能被一个线程所拥有

1.信道池

当然大家可能会想:能不能先早创建一些信道,搞一个信道池呢?
这样就能实现信道复用了啊,就像是学校里面的号码牌就是号码牌池一样…

2.实现信道池,资源隔离会受到影响吗?

RabbitMQ实现信道池的话,资源隔离并不会受到影响,因为实际上信道所实现的资源隔离是一种访问方式上的隔离,通过信道来对线程进行隔离,使得每个线程访问的资源不会出现交集。

不会出现某个线程向某个交换机发送数据时,另一个线程恰好刚删除完该交换机这种现象

因此信道是可以进行复用的,因为信道本身并不包含任何资源,它所使用的资源全都是连接对应资源,所访问操作资源全都是整个broker服务器当中的资源

换言之,信道仅仅只是一个“令牌/中间层”,只是用来隔离使用时对应访问资源的

完全类似于学校食堂当中的号码牌,号码牌本身并不包含资源,仅仅只是用来建立起学生和饭菜一一对应的关系而已,号码牌可以复用,信道也可以

信道池我们就放在扩展版本实现了,因为RabbitMQ本身并没有提供信道池,为了追求设计上的简化,把这个工作交给了我们客户端编写者

信道池的主要思想是通过重用现有的信道来减少创建和销毁信道的开销,这在高并发的场景下尤其有用而RabbitMQ本身就更适合在高并发的大数据场景下使用

3.网络搭建模块的实现

class Connection
{
public:Connection(const std::string &server_ip, uint16_t server_port,,const AsyncWorker::ptr& worker): _latch(1), _worker(worker), _client(_worker->_loopthread.startLoop(), muduo::net::InetAddress(server_ip, server_port), "Client"), _dispatcher(std::bind(&Connection::OnUnknownCallback, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)), _codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))), _channel_manager(std::make_shared<ChannelManager>()){_dispatcher.registerMessageCallback<BasicCommonResponse>(std::bind(&Connection::OnCommonResponse, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<BasicConsumeResponse>(std::bind(&Connection::OnConsumeResponse, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_client.setConnectionCallback(std::bind(OnConnectionCallback, this, std::placeholders::_1));_client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));connect();}Channel::ptr OpenChannel();void CloseChannel(const Channel::ptr &cp);private:void connect(){// 1. 客户端发起连接_client.connect();// 2. 等待连接建立成功_latch.wait();}void OnUnknownCallback(const muduo::net::TcpConnectionPtr &conn, const ns_google::MessagePtr &message, muduo::Timestamp){default_info("未知请求, 我们将断开该连接");if (conn->connected()){conn->shutdown();}}void OnConnectionCallback(const muduo::net::TcpConnectionPtr &conn){std::ostringstream oss;if (conn->connected()){_conn = conn;_latch.countDown();default_info("连接建立成功");}else{_conn.reset();default_info("连接断开成功");}}void OnCommonResponse(const muduo::net::TcpConnectionPtr &conn, const BasicCommonResponsePtr &resp, muduo::Timestamp);void OnConsumeResponse(const muduo::net::TcpConnectionPtr &conn, const BasicConsumeResponsePtr &resp, muduo::Timestamp);AsyncWorker::ptr _worker;muduo::CountDownLatch _latch;muduo::net::TcpConnectionPtr _conn;muduo::net::TcpClient _client;ProtobufDispatcher _dispatcher;ProtobufCodecPtr _codec;ChannelManager::ptr _channel_manager;
};

4.打开关闭信道的实现

打开信道:

  1. 创建信道
  2. 打开信道

同理,关闭信道:

  1. 关闭信道
  2. 销毁信道
void OpenChannel()
{// 1.创建channelChannel::ptr cp = _channel_manager->createChannel(_conn, _codec);// 2. 打开channelcp->openChannel();
}void CloseChannel(const Channel::ptr &cp)
{// 1. 关闭channelcp->closeChannel();// 2. 销毁channel_channel_manager->removeChannel(cp->cid());
}

5.两个响应回调函数的实现

无非就是先找到信道,然后调用其对应的函数
只不过信道的consume函数可以包装一下抛入线程池

void OnCommonResponse(const muduo::net::TcpConnectionPtr &conn, const BasicCommonResponsePtr &resp, muduo::Timestamp)
{// 找到该信道,然后将该响应添加到对应信道维护的相应哈希表当中Channel::ptr cp = _channel_manager->getChannel(resp->channel_id());if (cp.get() == nullptr){default_info("未找到该信道, 信道ID: %s",resp->channel_id().c_str());return;}cp->putResponse(resp);
}void OnConsumeResponse(const muduo::net::TcpConnectionPtr &conn, const BasicConsumeResponsePtr &resp, muduo::Timestamp)
{// 1.找到信道Channel::ptr cp = _channel_manager->getChannel(resp->channel_id());if (cp.get() == nullptr){default_info("未找到该信道, 信道ID: %s",resp->channel_id().c_str());return;}// 2.将 调用该信道对应的consume任务包装一下抛入线程池_worker->_pool.put([cp, resp](){ cp->consume(resp); });
}

6.完整代码

class Connection
{
public:using ptr=std::shared_ptr<Connection>;Connection(const std::string &server_ip, uint16_t server_port,,const AsyncWorker::ptr& worker): _latch(1), _worker(worker), _client(_worker->_loopthread.startLoop(), muduo::net::InetAddress(server_ip, server_port), "Client"), _dispatcher(std::bind(&Connection::OnUnknownCallback, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)), _codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))), _channel_manager(std::make_shared<ChannelManager>()){_dispatcher.registerMessageCallback<BasicCommonResponse>(std::bind(&Connection::OnCommonResponse, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<BasicConsumeResponse>(std::bind(&Connection::OnConsumeResponse, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_client.setConnectionCallback(std::bind(OnConnectionCallback, this, std::placeholders::_1));_client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));connect();}Channel::ptr OpenChannel(){// 1.创建channelChannel::ptr cp = _channel_manager->createChannel(_conn, _codec);// 2. 打开channelcp->openChannel();return cp;}void CloseChannel(const Channel::ptr &cp){// 1. 关闭channelcp->closeChannel();// 2. 销毁channel_channel_manager->removeChannel(cp->cid());}private:void connect(){// 1. 客户端发起连接_client.connect();// 2. 等待连接建立成功_latch.wait();}void OnUnknownCallback(const muduo::net::TcpConnectionPtr &conn, const ns_google::MessagePtr &message, muduo::Timestamp){default_info("未知请求, 我们将断开该连接");if (conn->connected()){conn->shutdown();}}void OnConnectionCallback(const muduo::net::TcpConnectionPtr &conn){std::ostringstream oss;if (conn->connected()){_conn = conn;_latch.countDown();default_info("连接建立成功");}else{_conn.reset();default_info("连接断开成功");}}void OnCommonResponse(const muduo::net::TcpConnectionPtr &conn, const BasicCommonResponsePtr &resp, muduo::Timestamp){// 找到该信道,然后将该响应添加到对应信道维护的相应哈希表当中Channel::ptr cp = _channel_manager->getChannel(resp->channel_id());if (cp.get() == nullptr){default_info("未找到该信道, 信道ID: %s",resp->channel_id().c_str());return;}cp->putResponse(resp);}void OnConsumeResponse(const muduo::net::TcpConnectionPtr &conn, const BasicConsumeResponsePtr &resp, muduo::Timestamp){// 1.找到信道Channel::ptr cp = _channel_manager->getChannel(resp->channel_id());if (cp.get() == nullptr){default_info("未找到该信道, 信道ID: %s",resp->channel_id().c_str());return;}// 2.将 调用该信道对应的consume任务包装一下抛入线程池_worker->_pool.put([cp, resp](){ cp->consume(resp); });}AsyncWorker::ptr _worker;muduo::CountDownLatch _latch;muduo::net::TcpConnectionPtr _conn;muduo::net::TcpClient _client;ProtobufDispatcher _dispatcher;ProtobufCodecPtr _codec;ChannelManager::ptr _channel_manager;
};

以上就是项目第十一弹:客户端模块设计与实现的全部内容

http://www.cadmedia.cn/news/9684.html

相关文章:

  • 沈阳关键词搜索排名天津百度关键词seo
  • 国内卡一卡二卡三网站视频黄页网站推广公司
  • 网站建设服务合同交印花税吗百度竞价被换着ip点击
  • 昌平做网站的公司关键词优化精灵
  • wordpress 全景图插件江西网络推广seo
  • 中国建设厅网站首页百度官网入口链接
  • 微网站开发平台免费网站优化一年多少钱
  • 韩文网站建设企业邮箱登录入口
  • 怎么自己做游戏软件seo网络推广外包公司
  • 服务商登封搜索引擎优化
  • 网站建设网站公司哪家好统计工具
  • 中国互联网数据平台官网seo引擎优化平台培训
  • 重庆建站公司官网seo助理
  • 上海网站建设网页营销技巧和营销方法
  • 江西商城网站建设网站建设平台哪家好
  • 沈阳市城乡建设局网站惠州网站营销推广
  • 海报素材网重庆seo优
  • 曲阜网站建设哪家便宜西安seo优化培训
  • 企业网站建设源码+微信+手机成免费的crm
  • 南通优普高端网站建设今天刚刚的最新新闻
  • 网页设计素材有两种分别是什么抖音seo优化软件
  • 超市网站建设策划书互联网产品推广是做什么的
  • 毕业设计网站源码百度视频推广
  • 建筑行业真的凉了吗搜狗seo
  • 温州网站建设团队seo公司推荐
  • ps免费模板网站淄博网站seo
  • 旅游景点网站优化排名易下拉软件
  • 天河网站建设企业成都新站软件快速排名
  • 河北中瑞建设集团有限公司网站营销网站建设大概费用
  • 高端网站建设的介绍朋友圈推广平台