Skip to content

Commit 8d188b7

Browse files
committed
feature: add net collector
1 parent d8a913d commit 8d188b7

File tree

14 files changed

+1047
-2
lines changed

14 files changed

+1047
-2
lines changed

core/host_monitor/Constants.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ namespace logtail {
2323
std::filesystem::path PROCESS_DIR = "/proc";
2424
const std::filesystem::path PROCESS_STAT = "stat";
2525
const std::filesystem::path PROCESS_LOADAVG = "loadavg";
26+
const std::filesystem::path PROCESS_NET_SOCKSTAT = "net/sockstat";
27+
const std::filesystem::path PROCESS_NET_SOCKSTAT6 = "net/sockstat6";
28+
const std::filesystem::path PROCESS_NET_DEV = "net/dev";
2629
const int64_t SYSTEM_HERTZ = sysconf(_SC_CLK_TCK);
2730

2831
} // namespace logtail

core/host_monitor/Constants.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ namespace logtail {
2525
extern std::filesystem::path PROCESS_DIR;
2626
const extern std::filesystem::path PROCESS_STAT;
2727
const extern std::filesystem::path PROCESS_LOADAVG;
28+
29+
const extern std::filesystem::path PROCESS_NET_SOCKSTAT;
30+
const extern std::filesystem::path PROCESS_NET_SOCKSTAT6;
31+
const extern std::filesystem::path PROCESS_NET_DEV;
2832
const extern int64_t SYSTEM_HERTZ;
2933

3034
#ifdef __ENTERPRISE__

core/host_monitor/HostMonitorInputRunner.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#include "host_monitor/Constants.h"
3636
#include "host_monitor/HostMonitorTimerEvent.h"
3737
#include "host_monitor/collector/CPUCollector.h"
38+
#include "host_monitor/collector/NetCollector.h"
3839
#include "host_monitor/collector/ProcessEntityCollector.h"
3940
#include "host_monitor/collector/SystemCollector.h"
4041
#include "logger/Logger.h"
@@ -55,6 +56,7 @@ HostMonitorInputRunner::HostMonitorInputRunner() {
5556
RegisterCollector<ProcessEntityCollector>();
5657
RegisterCollector<CPUCollector>();
5758
RegisterCollector<SystemCollector>();
59+
RegisterCollector<NetCollector>();
5860

5961
size_t threadPoolSize = 1;
6062
// threadPoolSize should be greater than 0

core/host_monitor/LinuxSystemInterface.cpp

Lines changed: 252 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "host_monitor/LinuxSystemInterface.h"
1818

1919
#include <chrono>
20+
#include <string>
2021

2122
using namespace std;
2223
using namespace std::chrono;
@@ -71,6 +72,195 @@ bool GetHostLoadavg(vector<string>& lines, string& errorMessage) {
7172
}
7273
return true;
7374
}
75+
bool ReadSocketStat(const std::filesystem::path& path, uint64_t& tcp) {
76+
tcp = 0;
77+
if (!path.empty()) {
78+
std::vector<std::string> sockstatLines;
79+
std::string errorMessage;
80+
if (!CheckExistance(path)) {
81+
errorMessage = "file does not exist: " + (path).string();
82+
return false;
83+
}
84+
85+
if (GetFileLines(path, sockstatLines, true, &errorMessage) != 0 || sockstatLines.empty()) {
86+
return false;
87+
}
88+
89+
for (auto const& line : sockstatLines) {
90+
if (line.size() >= 5 && (line.substr(0, 4) == "TCP:" || line.substr(0, 5) == "TCP6:")) {
91+
std::vector<std::string> metrics;
92+
boost::split(metrics, line, boost::is_any_of(" "), boost::token_compress_on);
93+
if (metrics.size() >= 9) {
94+
tcp += static_cast<uint64_t>(std::stoull(metrics[6])); // tw
95+
tcp += static_cast<uint64_t>(std::stoull(metrics[8])); // alloc
96+
}
97+
}
98+
}
99+
}
100+
return true;
101+
}
102+
103+
bool ReadNetLink(std::vector<uint64_t>& tcpStateCount) {
104+
static std::atomic_int sequence_number = 1;
105+
int fd;
106+
// struct inet_diag_msg *r;
107+
// 使用netlink socket与内核通信
108+
fd = socket(AF_NETLINK, SOCK_RAW, NETLINK_INET_DIAG);
109+
if (fd < 0) {
110+
LOG_WARNING(sLogger,
111+
("ReadNetLink, socket(AF_NETLINK, SOCK_RAW, NETLINK_INET_DIAG) failed, error msg: ",
112+
std::string(strerror(errno))));
113+
return false;
114+
}
115+
116+
117+
// 存在多个netlink socket时,必须单独bind,并通过nl_pid来区分
118+
struct sockaddr_nl nladdr_bind {};
119+
memset(&nladdr_bind, 0, sizeof(nladdr_bind));
120+
nladdr_bind.nl_family = AF_NETLINK;
121+
nladdr_bind.nl_pad = 0;
122+
nladdr_bind.nl_pid = getpid();
123+
nladdr_bind.nl_groups = 0;
124+
if (bind(fd, (struct sockaddr*)&nladdr_bind, sizeof(nladdr_bind))) {
125+
LOG_WARNING(sLogger, ("ReadNetLink, bind netlink socket failed, error msg: ", std::string(strerror(errno))));
126+
close(fd);
127+
return false;
128+
}
129+
struct sockaddr_nl nladdr {};
130+
memset(&nladdr, 0, sizeof(nladdr));
131+
nladdr.nl_family = AF_NETLINK;
132+
struct NetLinkRequest req {};
133+
memset(&req, 0, sizeof(req));
134+
req.nlh.nlmsg_len = sizeof(req);
135+
req.nlh.nlmsg_type = TCPDIAG_GETSOCK;
136+
req.nlh.nlmsg_flags = NLM_F_ROOT | NLM_F_MATCH | NLM_F_REQUEST;
137+
// sendto kernel
138+
req.nlh.nlmsg_pid = getpid();
139+
req.nlh.nlmsg_seq = ++sequence_number;
140+
req.r.idiag_family = AF_INET;
141+
req.r.idiag_states = 0xfff;
142+
req.r.idiag_ext = 0;
143+
struct iovec iov {};
144+
memset(&iov, 0, sizeof(iov));
145+
iov.iov_base = &req;
146+
iov.iov_len = sizeof(req);
147+
struct msghdr msg {};
148+
memset(&msg, 0, sizeof(msg));
149+
msg.msg_name = (void*)&nladdr;
150+
msg.msg_namelen = sizeof(nladdr);
151+
msg.msg_iov = &iov;
152+
msg.msg_iovlen = 1;
153+
154+
if (sendmsg(fd, &msg, 0) < 0) {
155+
LOG_WARNING(sLogger, ("ReadNetLink, sendmsg(2) failed, error msg: ", std::string(strerror(errno))));
156+
close(fd);
157+
return false;
158+
}
159+
char buf[8192];
160+
iov.iov_base = buf;
161+
iov.iov_len = sizeof(buf);
162+
163+
uint64_t received_count = 0;
164+
uint64_t MAX_RECV_COUNT = std::numeric_limits<uint64_t>::max();
165+
while (received_count < MAX_RECV_COUNT) {
166+
received_count++;
167+
// struct nlmsghdr *h;
168+
memset(&msg, 0, sizeof(msg));
169+
msg.msg_name = (void*)&nladdr;
170+
msg.msg_namelen = sizeof(nladdr);
171+
msg.msg_iov = &iov;
172+
msg.msg_iovlen = 1;
173+
ssize_t status = recvmsg(fd, (struct msghdr*)&msg, 0);
174+
if (status < 0) {
175+
if (errno == EINTR || errno == EAGAIN) {
176+
continue;
177+
}
178+
LOG_WARNING(sLogger, ("ReadNetLink, recvmsg(2) failed, error msg: ", std::string(strerror(errno))));
179+
close(fd);
180+
return false;
181+
} else if (status == 0) {
182+
LOG_WARNING(sLogger,
183+
("ReadNetLink, Unexpected zero-sized reply from netlink socket. error msg: ",
184+
std::string(strerror(errno))));
185+
close(fd);
186+
return true;
187+
}
188+
189+
// h = (struct nlmsghdr *) buf;
190+
for (auto h = (struct nlmsghdr*)buf; NLMSG_OK(h, status); h = NLMSG_NEXT(h, status)) {
191+
if (static_cast<uint64_t>(h->nlmsg_seq) != static_cast<uint64_t>(sequence_number)) {
192+
// sequence_number is not equal
193+
// h = NLMSG_NEXT(h, status);
194+
continue;
195+
}
196+
197+
if (h->nlmsg_type == NLMSG_DONE) {
198+
close(fd);
199+
return true;
200+
} else if (h->nlmsg_type == NLMSG_ERROR) {
201+
if (h->nlmsg_len < NLMSG_LENGTH(sizeof(struct nlmsgerr))) {
202+
LOG_WARNING(sLogger, ("ReadNetLink ", "message truncated"));
203+
} else {
204+
auto msg_error = (struct nlmsgerr*)NLMSG_DATA(h);
205+
LOG_WARNING(sLogger, ("ReadNetLink, Received error, error msg: ", msg_error));
206+
}
207+
close(fd);
208+
return false;
209+
}
210+
auto r = (struct inet_diag_msg*)NLMSG_DATA(h);
211+
/*This code does not(need to) distinguish between IPv4 and IPv6.*/
212+
if (r->idiag_state > TCP_CLOSING || r->idiag_state < TCP_ESTABLISHED) {
213+
// Ignoring connection with unknown state
214+
continue;
215+
}
216+
tcpStateCount[r->idiag_state]++;
217+
// h = NLMSG_NEXT(h, status);
218+
}
219+
}
220+
close(fd);
221+
return true;
222+
}
223+
224+
bool GetNetStateByNetLink(NetState& netState) {
225+
std::vector<uint64_t> tcpStateCount(TCP_CLOSING + 1, 0);
226+
if (ReadNetLink(tcpStateCount) == false) {
227+
return false;
228+
}
229+
uint64_t tcp = 0, tcpSocketStat = 0;
230+
231+
if (ReadSocketStat(PROCESS_DIR / PROCESS_NET_SOCKSTAT, tcp)) {
232+
tcpSocketStat += tcp;
233+
}
234+
if (ReadSocketStat(PROCESS_DIR / PROCESS_NET_SOCKSTAT6, tcp)) {
235+
tcpSocketStat += tcp;
236+
}
237+
238+
int total = 0;
239+
for (int i = TCP_ESTABLISHED; i <= TCP_CLOSING; i++) {
240+
if (i == TCP_SYN_SENT || i == TCP_SYN_RECV) {
241+
total += tcpStateCount[i];
242+
}
243+
netState.tcpStates[i] = tcpStateCount[i];
244+
}
245+
// 设置为-1表示没有采集
246+
netState.tcpStates[TCP_TOTAL] = total + tcpSocketStat;
247+
netState.tcpStates[TCP_NON_ESTABLISHED] = netState.tcpStates[TCP_TOTAL] - netState.tcpStates[TCP_ESTABLISHED];
248+
return true;
249+
}
250+
251+
bool GetHostNetDev(vector<string>& lines, string& errorMessage) {
252+
errorMessage.clear();
253+
if (!CheckExistance(PROCESS_DIR / PROCESS_NET_DEV)) {
254+
errorMessage = "file does not exist: " + (PROCESS_DIR / PROCESS_NET_DEV).string();
255+
return false;
256+
}
257+
258+
int ret = GetFileLines(PROCESS_DIR / PROCESS_NET_DEV, lines, true, &errorMessage);
259+
if (ret != 0 || lines.empty()) {
260+
return false;
261+
}
262+
return true;
263+
}
74264

75265
bool LinuxSystemInterface::GetSystemInformationOnce(SystemInformation& systemInfo) {
76266
std::vector<std::string> lines;
@@ -221,4 +411,66 @@ bool LinuxSystemInterface::GetCPUCoreNumInformationOnce(CpuCoreNumInformation& c
221411
return true;
222412
}
223413

414+
bool LinuxSystemInterface::GetTCPStatInformationOnce(TCPStatInformation& tcpStatInfo) {
415+
NetState netState;
416+
417+
bool ret = false;
418+
419+
ret = GetNetStateByNetLink(netState);
420+
421+
if (ret) {
422+
tcpStatInfo.stat.tcpEstablished = (netState.tcpStates[TCP_ESTABLISHED]);
423+
tcpStatInfo.stat.tcpListen = (netState.tcpStates[TCP_LISTEN]);
424+
tcpStatInfo.stat.tcpTotal = (netState.tcpStates[TCP_TOTAL]);
425+
tcpStatInfo.stat.tcpNonEstablished = (netState.tcpStates[TCP_NON_ESTABLISHED]);
426+
}
427+
428+
return ret;
429+
}
430+
431+
bool LinuxSystemInterface::GetNetRateInformationOnce(NetRateInformation& netRateInfo) {
432+
// /proc/net/dev
433+
std::vector<std::string> netDevLines = {};
434+
std::string errorMessage;
435+
bool ret = GetHostNetDev(netDevLines, errorMessage);
436+
if (!ret || netDevLines.empty()) {
437+
return false;
438+
}
439+
440+
for (size_t i = 2; i < netDevLines.size(); ++i) {
441+
auto pos = netDevLines[i].find_first_of(':');
442+
std::string devCounterStr = netDevLines[i].substr(pos + 1);
443+
std::string devName = netDevLines[i].substr(0, pos);
444+
std::vector<std::string> netDevMetric;
445+
boost::algorithm::trim(devCounterStr);
446+
boost::split(netDevMetric, devCounterStr, boost::is_any_of(" "), boost::token_compress_on);
447+
448+
if (netDevMetric.size() >= 16) {
449+
NetInterfaceMetric information;
450+
int index = 0;
451+
boost::algorithm::trim(devName);
452+
information.name = devName;
453+
information.rxBytes = boost::lexical_cast<uint64_t>(netDevMetric[index++]);
454+
information.rxPackets = boost::lexical_cast<uint64_t>(netDevMetric[index++]);
455+
information.rxErrors = boost::lexical_cast<uint64_t>(netDevMetric[index++]);
456+
information.rxDropped = boost::lexical_cast<uint64_t>(netDevMetric[index++]);
457+
information.rxOverruns = boost::lexical_cast<uint64_t>(netDevMetric[index++]);
458+
information.rxFrame = boost::lexical_cast<uint64_t>(netDevMetric[index++]);
459+
// skip compressed multicast
460+
index += 2;
461+
information.txBytes = boost::lexical_cast<uint64_t>(netDevMetric[index++]);
462+
information.txPackets = boost::lexical_cast<uint64_t>(netDevMetric[index++]);
463+
information.txErrors = boost::lexical_cast<uint64_t>(netDevMetric[index++]);
464+
information.txDropped = boost::lexical_cast<uint64_t>(netDevMetric[index++]);
465+
information.txOverruns = boost::lexical_cast<uint64_t>(netDevMetric[index++]);
466+
information.txCollisions = boost::lexical_cast<uint64_t>(netDevMetric[index++]);
467+
information.txCarrier = boost::lexical_cast<uint64_t>(netDevMetric[index++]);
468+
469+
information.speed = -1;
470+
netRateInfo.metrics.push_back(information);
471+
}
472+
}
473+
return true;
474+
}
475+
224476
} // namespace logtail

core/host_monitor/LinuxSystemInterface.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ class LinuxSystemInterface : public SystemInterface {
3939
bool GetCPUInformationOnce(CPUInformation& cpuInfo) override;
4040
bool GetProcessListInformationOnce(ProcessListInformation& processListInfo) override;
4141
bool GetProcessInformationOnce(pid_t pid, ProcessInformation& processInfo) override;
42+
bool GetTCPStatInformationOnce(TCPStatInformation& tcpStatInfo) override;
43+
bool GetNetRateInformationOnce(NetRateInformation& netRateInfo) override;
4244

4345
bool GetSystemLoadInformationOnce(SystemLoadInformation& systemLoadInfo) override;
4446
bool GetCPUCoreNumInformationOnce(CpuCoreNumInformation& cpuCoreNumInfo) override;

core/host_monitor/SystemInterface.cpp

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,28 @@ bool SystemInterface::GetCPUCoreNumInformation(CpuCoreNumInformation& cpuCoreNum
114114
errorType);
115115
}
116116

117+
bool SystemInterface::GetTCPStatInformation(TCPStatInformation& tcpStatInfo) {
118+
const std::string errorType = "TCP stat";
119+
return MemoizedCall(
120+
mTCPStatInformationCache,
121+
[this](BaseInformation& info) {
122+
return this->GetTCPStatInformationOnce(static_cast<TCPStatInformation&>(info));
123+
},
124+
tcpStatInfo,
125+
errorType);
126+
}
127+
128+
bool SystemInterface::GetNetRateInformation(NetRateInformation& netRateInfo) {
129+
const std::string errorType = "Net rate";
130+
return MemoizedCall(
131+
mNetRateInformationCache,
132+
[this](BaseInformation& info) {
133+
return this->GetNetRateInformationOnce(static_cast<NetRateInformation&>(info));
134+
},
135+
netRateInfo,
136+
errorType);
137+
}
138+
117139
template <typename F, typename InfoT, typename... Args>
118140
bool SystemInterface::MemoizedCall(
119141
SystemInformationCache<InfoT, Args...>& cache, F&& func, InfoT& info, const std::string& errorType, Args... args) {

0 commit comments

Comments
 (0)