之前的源码文章分析了Node.js Inspector的使用和原理,并粗略地分析了其源码,解析因为Node.js Inspector的源码实现非常复杂,逻辑又非常绕,解析所以本文打算更深入、源码更通俗地讲解Node.js Inspector的解析实现。
当我们以以下方式执行我们的源码应用时
node inspect app.jsNode.js在启动的过程中,就会初始化Inspector相关的解析逻辑。
inspector_agent_ = std::make_unique<inspector::Agent>(this);Agent是源码负责和V8 Inspector通信的对象。创建完后接着执行env->InitializeInspector({ })启动Agent。解析
inspector_agent_->Start(...);Start继续执行Agent::StartIoThread。源码
bool Agent::StartIoThread() { io_ = InspectorIo::Start(client_->getThreadHandle(),解析 ...); return true; }StartIoThread中的client_->getThreadHandle()是重要的逻辑,我们先来分析该函数。源码
std::shared_ptr<MainThreadHandle> getThreadHandle() { if (!interface_) { interface_ = std::make_shared<MainThreadInterface>(env_->inspector_agent(),解析 ...); } return interface_->GetHandle(); }getThreadHandle首先创建来一个MainThreadInterface对象,接着又调用了他的源码GetHandle方法,我们看一下该方法的逻辑。
std::shared_ptr<MainThreadHandle> MainThreadInterface::GetHandle() { if (handle_ == nullptr) handle_ = std::make_shared<MainThreadHandle>(this); return handle_; }GetHandlei了创建了一个MainThreadHandle对象,最终结构如下所示。
分析完后我们继续看Agent::StartIoThread中InspectorIo::Start的逻辑。香港云服务器
std::unique_ptr<InspectorIo> InspectorIo::Start(std::shared_ptr<MainThreadHandle> main_thread, ...) { auto io = std::unique_ptr<InspectorIo>(new InspectorIo(main_thread, ...)); return io; }InspectorIo::Star里新建了一个InspectorIo对象,我们看看InspectorIo构造函数的逻辑。
InspectorIo::InspectorIo(std::shared_ptr<MainThreadHandle> main_thread, ...) : // 初始化main_thread_ main_thread_(main_thread)) { // 新建一个子线程,子线程中执行InspectorIo::ThreadMain uv_thread_create(&thread_, InspectorIo::ThreadMain, this); }这时候结构如下。
Inspector在子线程里启动的原因主要有两个。
1 如果在主线程里运行,那么当我们断点调试的时候,Node.js主线程就会被停住,也就无法处理客户端发过来的调试指令。
2 如果主线程陷入死循环,我们就无法实时抓取进程的profile数据来分析原因。接着继续看一下子线程里执行InspectorIo::ThreadMain的逻辑。
void InspectorIo::ThreadMain(void* io) { static_cast<InspectorIo*>(io)->ThreadMain(); } void InspectorIo::ThreadMain() { uv_loop_t loop; loop.data = nullptr; // 在子线程开启一个新的事件循环 int err = uv_loop_init(&loop); std::shared_ptr<RequestQueueData> queue(new RequestQueueData(&loop), ...); // 新建一个delegate,用于处理请求 std::unique_ptr<InspectorIoDelegate> delegate( new InspectorIoDelegate(queue, main_thread_, ...) ); InspectorSocketServer server(std::move(delegate), ...); server.Start() uv_run(&loop, UV_RUN_DEFAULT); }ThreadMain里主要三个逻辑
1 创建一个delegate对象,该对象是核心的对象,后面我们会看到有什么作用。
2 创建一个服务器并启动。
3 开启事件循环。接下来看一下服务器的逻辑,首先看一下创建服务器的逻辑。
InspectorSocketServer::InspectorSocketServer(std::unique_ptr<SocketServerDelegate> delegate, ...) : // 保存delegate delegate_(std::move(delegate)), // 初始化sessionId next_session_id_(0) { // 设置delegate的server为当前服务器 delegate_->AssignServer(this); }执行完后形成以下结构。云南idc服务商
接着我们看启动服务器的逻辑。
bool InspectorSocketServer::Start() { // DNS解析,比如输入的是localhost struct addrinfo hints; memset(&hints, 0, sizeof(hints)); hints.ai_flags = AI_NUMERICSERV; hints.ai_socktype = SOCK_STREAM; uv_getaddrinfo_t req; const std::string port_string = std::to_string(port_); uv_getaddrinfo(loop_, &req, nullptr, host_.c_str(), port_string.c_str(), &hints); // 监听解析到的ip列表 for (addrinfo* address = req.addrinfo; address != nullptr; address = address->ai_next) { auto server_socket = ServerSocketPtr(new ServerSocket(this)); err = server_socket->Listen(address->ai_addr, loop_); if (err == 0) server_sockets_.push_back(std::move(server_socket)); } return true; }首先根据参数做一个DNS解析,然后根据拿到的ip列表(通常是一个),创建对应个数的ServerSocket对象,并执行他的Listen方法。ServerSocket表示一个监听socket。看一下ServerSocket的构造函数。
ServerSocket(InspectorSocketServer* server) : tcp_socket_(uv_tcp_t()), server_(server) { }执行完后结构如下。
接着看一下ServerSocket的Listen方法。
int ServerSocket::Listen(sockaddr* addr, uv_loop_t* loop) { uv_tcp_t* server = &tcp_socket_; uv_tcp_init(loop, server) uv_tcp_bind(server, addr, 0); uv_listen(reinterpret_cast<uv_stream_t*>(server), 511, ServerSocket::SocketConnectedCallback); }Listen调用Libuv的接口完成服务器的启动。至此,Inspector提供的Weboscket服务器启动了。
从刚才分析中可以看到,当有连接到来时执行回调ServerSocket::SocketConnectedCallback。
void ServerSocket::SocketConnectedCallback(uv_stream_t* tcp_socket, int status) { if (status == 0) { // 根据Libuv handle找到对应的ServerSocket对象 ServerSocket* server_socket = ServerSocket::FromTcpSocket(tcp_socket); // Socket对象的server_字段保存了所在的InspectorSocketServer server_socket->server_->Accept(server_socket->port_, tcp_socket); } }接着看InspectorSocketServer的Accept是如何处理连接的服务器托管。
void InspectorSocketServer::Accept(int server_port, uv_stream_t* server_socket) { std::unique_ptr<SocketSession> session( new SocketSession(this, next_session_id_++, server_port) ); InspectorSocket::DelegatePointer delegate = InspectorSocket::DelegatePointer( new SocketSession::Delegate(this, session->id()) ); InspectorSocket::Pointer inspector = InspectorSocket::Accept(server_socket, std::move(delegate)); if (inspector) { session->Own(std::move(inspector)); connected_sessions_[session->id()].second = std::move(session); } }Accept的首先创建里一个SocketSession和SocketSession::Delegate对象。然后调用InspectorSocket::Accept,从代码中可以看到InspectorSocket::Accept会返回一个InspectorSocket对象。InspectorSocket是对通信socket的封装(和客户端通信的socket,区别于服务器的监听socket)。然后记录session对象对应的InspectorSocket对象,同时记录sessionId和session的映射关系。结构如下图所示。
接着看一下InspectorSocket::Accept返回InspectorSocket的逻辑。
InspectorSocket::Pointer InspectorSocket::Accept(uv_stream_t* server, DelegatePointer delegate) { auto tcp = TcpHolder::Accept(server, std::move(delegate)); InspectorSocket* inspector = new InspectorSocket(); inspector->SwitchProtocol(new HttpHandler(inspector, std::move(tcp))); return InspectorSocket::Pointer(inspector); }InspectorSocket::Accept的代码不多,但是逻辑还是挺多的。
TcpHolder::Pointer TcpHolder::Accept( uv_stream_t* server, InspectorSocket::DelegatePointer delegate) { // 新建一个TcpHolder对象,TcpHolder是对uv_tcp_t和delegate的封装 TcpHolder* result = new TcpHolder(std::move(delegate)); // 拿到TcpHolder对象的uv_tcp_t结构体 uv_stream_t* tcp = reinterpret_cast<uv_stream_t*>(&result->tcp_); // 初始化 int err = uv_tcp_init(server->loop, &result->tcp_); // 摘取一个TCP连接对应的fd保存到TcpHolder的uv_tcp_t结构体中(即第二个参数的tcp字段) uv_accept(server, tcp); // 注册等待可读事件,有数据时执行OnDataReceivedCb回调 uv_read_start(tcp, allocate_buffer, OnDataReceivedCb); return TcpHolder::Pointer(result); }2 新建一个HttpHandler对象。
explicit HttpHandler(InspectorSocket* inspector, TcpHolder::Pointer tcp) : ProtocolHandler(inspector, std::move(tcp)){ llhttp_init(&parser_, HTTP_REQUEST, &parser_settings); llhttp_settings_init(&parser_settings); parser_settings.on_header_field = OnHeaderField; parser_settings.on_header_value = OnHeaderValue; parser_settings.on_message_complete = OnMessageComplete; parser_settings.on_url = OnPath; } ProtocolHandler::ProtocolHandler(InspectorSocket* inspector, TcpHolder::Pointer tcp) : inspector_(inspector), tcp_(std::move(tcp)) { // 设置TCP数据的handler,TCP是只负责传输,数据的解析交给handler处理 tcp_->SetHandler(this); }HttpHandler是对uv_tcp_t的封装,主要通过HTTP解析器llhttp对HTTP协议进行解析。
3 调用inspector->SwitchProtocol()切换当前协议为HTTP,建立TCP连接后,首先要经过一个HTTP请求从HTTP协议升级到WebSocket协议,升级成功后就使用Websocket协议进行通信。我们看一下这时候的结构图。
至此,就完成了连接处理的分析。
完成了TCP连接的处理后,接下来要完成协议升级,因为Inspector是通过WebSocket协议和客户端通信的,所以需要通过一个HTTP请求来完成HTTP到WebSocekt协议的升级。从刚才的分析中看当有数据到来时会执行OnDataReceivedCb回调。
void TcpHolder::OnDataReceivedCb(uv_stream_t* tcp, ssize_t nread, const uv_buf_t* buf) { TcpHolder* holder = From(tcp); holder->ReclaimUvBuf(buf, nread); // 调用handler的onData,目前handler是HTTP协议 holder->handler_->OnData(&holder->buffer); }TCP层收到数据后交给应用层解析,直接调用上层的OnData回调。
void OnData(std::vector<char>* data) override { // 解析HTTP协议 llhttp_execute(&parser_, data->data(), data->size()); // 解析完并且是升级协议的请求则调用delegate的回调OnSocketUpgrade delegate()->OnSocketUpgrade(event.host, event.path, event.ws_key); }OnData可能会被多次回调,并通过llhttp_execute解析收到的HTTP报文,当发现是一个协议升级的请求后,就调用OnSocketUpgrade回调。delegate是TCP层保存的SocketSession::Delegate对象。来看一下该对象的OnSocketUpgrade方法。
void SocketSession::Delegate::OnSocketUpgrade(const std::string& host, const std::string& path, const std::string& ws_key) { std::string id = path.empty() ? path : path.substr(1); server_->SessionStarted(session_id_, id, ws_key); }OnSocketUpgrade又调用来server_(InspectorSocketServer对象)的SessionStarted。
void InspectorSocketServer::SessionStarted(int session_id, const std::string& id, const std::string& ws_key) { // 找到对应的session对象 SocketSession* session = Session(session_id); connected_sessions_[session_id].first = id; session->Accept(ws_key); delegate_->StartSession(session_id, id); }首先通过session_id找到建立TCP连接时分配的SocketSession对象。
1 执行session->Accept(ws_key);回复客户端同意协议升级。
void Accept(const std::string& ws_key) { ws_socket_->AcceptUpgrade(ws_key); }从结构图我们可以看到ws_socket_是一个InspectorSocket对象。
void AcceptUpgrade(const std::string& accept_key) override { char accept_string[ACCEPT_KEY_LENGTH]; generate_accept_string(accept_key, &accept_string); const char accept_ws_prefix[] = "HTTP/1.1 101 Switching Protocols\r\n" "Upgrade: websocket\r\n" "Connection: Upgrade\r\n" "Sec-WebSocket-Accept: "; const char accept_ws_suffix[] = "\r\n\r\n"; std::vector<char> reply(accept_ws_prefix, accept_ws_prefix + sizeof(accept_ws_prefix) - 1); reply.insert(reply.end(), accept_string, accept_string + sizeof(accept_string)); reply.insert(reply.end(), accept_ws_suffix, accept_ws_suffix + sizeof(accept_ws_suffix) - 1); // 回复101给客户端 WriteRaw(reply, WriteRequest::Cleanup); // 切换handler为WebSocket handler inspector_->SwitchProtocol(new WsHandler(inspector_, std::move(tcp_))); }AcceptUpgradeh首先回复客户端101表示同意升级道WebSocket协议,然后切换数据处理器为WsHandler,即后续的数据按照WebSocket协议处理。
2 执行delegate_->StartSession(session_id, id)建立和V8 Inspector的会话。delegate_是InspectorIoDelegate对象。
void InspectorIoDelegate::StartSession(int session_id, const std::string& target_id) { auto session = main_thread_->Connect( std::unique_ptr<InspectorSessionDelegate>( new IoSessionDelegate(request_queue_->handle(), session_id) ), true); if (session) { sessions_[session_id] = std::move(session); fprintf(stderr, "Debugger attached.\n"); } }首先通过main_thread_->Connect拿到一个session,并在InspectorIoDelegate中记录映射关系。结构图如下。
接下来看一下main_thread_->Connect的逻辑(main_thread_是MainThreadHandle对象)。
std::unique_ptr<InspectorSession> MainThreadHandle::Connect( std::unique_ptr<InspectorSessionDelegate> delegate, bool prevent_shutdown) { return std::unique_ptr<InspectorSession>( new CrossThreadInspectorSession(++next_session_id_, shared_from_this(), std::move(delegate), prevent_shutdown)); }Connect函数新建了一个CrossThreadInspectorSession对象。
CrossThreadInspectorSession( int id, std::shared_ptr<MainThreadHandle> thread, std::unique_ptr<InspectorSessionDelegate> delegate, bool prevent_shutdown) // 创建一个MainThreadSessionState对象 : state_(thread, std::bind(MainThreadSessionState::Create, std::placeholders::_1, prevent_shutdown)) { // 执行MainThreadSessionState::Connect state_.Call(&MainThreadSessionState::Connect, std::move(delegate)); }继续看MainThreadSessionState::Connect。
void Connect(std::unique_ptr<InspectorSessionDelegate> delegate) { Agent* agent = thread_->inspector_agent(); session_ = agent->Connect(std::move(delegate), prevent_shutdown_); }继续调agent->Connect。
std::unique_ptr<InspectorSession> Agent::Connect( std::unique_ptr<InspectorSessionDelegate> delegate, bool prevent_shutdown) { int session_id = client_->connectFrontend(std::move(delegate), prevent_shutdown); return std::unique_ptr<InspectorSession>( new SameThreadInspectorSession(session_id, client_)); }继续调connectFrontend
int connectFrontend(std::unique_ptr<InspectorSessionDelegate> delegate, bool prevent_shutdown) { int session_id = next_session_id_++; channels_[session_id] = std::make_unique<ChannelImpl>(env_, client_, getWorkerManager(), std::move(delegate), getThreadHandle(), prevent_shutdown); return session_id; }connectFrontend创建了一个ChannelImpl并且在channels_中保存了映射关系。看看ChannelImpl的构造函数。
explicit ChannelImpl(Environment* env, const std::unique_ptr<V8Inspector>& inspector, std::unique_ptr<InspectorSessionDelegate> delegate, ...) : delegate_(std::move(delegate)) { session_ = inspector->connect(CONTEXT_GROUP_ID, this, StringView()); }ChannelImpl调用inspector->connect建立了一个和V8 Inspector的会话。结构图大致如下。
TCP连接建立了,协议升级也完成了,接下来就可以开始处理业务数据。从前面的分析中我们已经知道数据到来时会执行TcpHoldler的handler_->OnData回调。因为已经完成了协议升级,所以这时候的handler变成了WeSocket handler。
void OnData(std::vector<char>* data) override { // 1. Parse. int processed = 0; do { processed = ParseWsFrames(*data); // 2. Fix the data size & length if (processed > 0) { remove_from_beginning(data, processed); } } while (processed > 0 && !data->empty()); }OnData通过ParseWsFrames解析WebSocket协议。
int ParseWsFrames(const std::vector<char>& buffer) { int bytes_consumed = 0; std::vector<char> output; bool compressed = false; // 解析WebSocket协议 ws_decode_result r = decode_frame_hybi17(buffer, true /* client_frame */, &bytes_consumed, &output, &compressed); // 执行delegate的回调 delegate()->OnWsFrame(output); return bytes_consumed; }前面已经分析过delegate是TcpHoldler的delegate,即SocketSession::Delegate对象。
void SocketSession::Delegate::OnWsFrame(const std::vector<char>& data) { server_->MessageReceived(session_id_, std::string(data.data(), data.size())); }继续回调server_->MessageReceived。从结构图可以看到server_是InspectorSocketServer对象。
void MessageReceived(int session_id, const std::string& message) { delegate_->MessageReceived(session_id, message); }继续回调delegate_->MessageReceived。InspectorSocketServer的delegate_是InspectorIoDelegate对象。
void InspectorIoDelegate::MessageReceived(int session_id, const std::string& message) { auto session = sessions_.find(session_id); if (session != sessions_.end()) session->second->Dispatch(Utf8ToStringView(message)->string()); }首先通过session_id找到对应的session。session是一个CrossThreadInspectorSession对象。看看他的Dispatch方法。
void Dispatch(const StringView& message) override { state_.Call(&MainThreadSessionState::Dispatch, StringBuffer::create(message)); }执行MainThreadSessionState::Dispatch。
void Dispatch(std::unique_ptr<StringBuffer> message) { session_->Dispatch(message->string()); }session_是SameThreadInspectorSession对象。
void SameThreadInspectorSession::Dispatch( const v8_inspector::StringView& message) { auto client = client_.lock(); if (client) client->dispatchMessageFromFrontend(session_id_, message); }继续调client->dispatchMessageFromFrontend。
void dispatchMessageFromFrontend(int session_id, const StringView& message) { channels_[session_id]->dispatchProtocolMessage(message); }通过session_id找到对应的ChannelImpl,继续调ChannelImpl的dispatchProtocolMessage。
voiddispatchProtocolMessage(const StringView& message) { session_->dispatchProtocolMessage(message); }最终调用和V8 Inspector的会话对象把数据发送给V8。至此客户端到V8 Inspector的通信过程就完成了。
接着看从V8 inspector到客户端的数据传递逻辑。V8 inspector是通过channel的sendResponse函数传递给客户端的。
void sendResponse( int callId, std::unique_ptr<v8_inspector::StringBuffer> message) override { sendMessageToFrontend(message->string()); } void sendMessageToFrontend(const StringView& message) { delegate_->SendMessageToFrontend(message); }delegate_是IoSessionDelegate对象。
void SendMessageToFrontend(const v8_inspector::StringView& message) override { request_queue_->Post(id_, TransportAction::kSendMessage, StringBuffer::create(message)); }request_queue_是RequestQueueData对象。
void Post(int session_id, TransportAction action, std::unique_ptr<StringBuffer> message) { Mutex::ScopedLock scoped_lock(state_lock_); bool notify = messages_.empty(); messages_.emplace_back(action, session_id, std::move(message)); if (notify) { CHECK_EQ(0, uv_async_send(&async_)); incoming_message_cond_.Broadcast(scoped_lock); } }Post首先把消息入队,然后通过异步的方式通知async_接着看async_的处理函数(在子线程的事件循环里执行)。
uv_async_init(loop, &async_, [](uv_async_t* async) { // 拿到async对应的上下文 RequestQueueData* wrapper = node::ContainerOf(&RequestQueueData::async_, async); // 执行RequestQueueData的DoDispatch wrapper->DoDispatch();});void DoDispatch() { for (const auto& request : GetMessages()) { request.Dispatch(server_); } }request是RequestToServer对象。
void Dispatch(InspectorSocketServer* server) const { switch (action_) { case TransportAction::kSendMessage: server->Send( session_id_, protocol::StringUtil::StringViewToUtf8(message_->string())); break; } }接着看InspectorSocketServer的Send。
void InspectorSocketServer::Send(int session_id, const std::string& message) { SocketSession* session = Session(session_id); if (session != nullptr) { session->Send(message); } }session代表可客户端的一个连接。
void SocketSession::Send(const std::string& message) { ws_socket_->Write(message.data(), message.length()); }接着调用WebSocket handler的Write。
void Write(const std::vector<char> data) override { std::vector<char> output = encode_frame_hybi17(data); WriteRaw(output, WriteRequest::Cleanup); }WriteRaw是基类ProtocolHandler实现的。
int ProtocolHandler::WriteRaw(const std::vector<char>& buffer, uv_write_cb write_cb) { return tcp_->WriteRaw(buffer, write_cb); }最终是通过TCP连接返回给客户端。
int TcpHolder::WriteRaw(const std::vector<char>& buffer, uv_write_cb write_cb) { // Freed in write_request_cleanup WriteRequest* wr = new WriteRequest(handler_, buffer); uv_stream_t* stream = reinterpret_cast<uv_stream_t*>(&tcp_); int err = uv_write(&wr->req, stream, &wr->buf, 1, write_cb); if (err < 0) delete wr; return err < 0; }新建一个写请求,socket可写的时候发送数据给客户端。
后记:Node.js Inspector的原理虽然不复杂的,但是实现实在太绕了。