Skip to content

Commit 5da995d

Browse files
committed
chore: Log db_index in traffic logger
1 parent 1a5eacc commit 5da995d

File tree

7 files changed

+77
-53
lines changed

7 files changed

+77
-53
lines changed

src/facade/dragonfly_connection.cc

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -164,9 +164,13 @@ void OpenTrafficLogger(string_view base_path) {
164164
#else
165165
LOG(WARNING) << "Traffic logger is only supported on Linux";
166166
#endif
167+
168+
// Write version, incremental numbering :)
169+
tl_traffic_logger.log_file->Write(io::Bytes{2});
167170
}
168171

169-
void LogTraffic(uint32_t id, bool has_more, absl::Span<RespExpr> resp) {
172+
void LogTraffic(uint32_t id, bool has_more, absl::Span<RespExpr> resp,
173+
ServiceInterface::ContextInfo ci) {
170174
string_view cmd = resp.front().GetView();
171175
if (absl::EqualsIgnoreCase(cmd, "debug"sv))
172176
return;
@@ -176,28 +180,33 @@ void LogTraffic(uint32_t id, bool has_more, absl::Span<RespExpr> resp) {
176180
char stack_buf[1024];
177181
char* next = stack_buf;
178182

179-
// We write id, timestamp, has_more, num_parts, part_len, part_len, part_len, ...
183+
// We write id, timestamp, db_index, has_more, num_parts, part_len, part_len, part_len, ...
180184
// And then all the part blobs concatenated together.
181185
auto write_u32 = [&next](uint32_t i) {
182186
absl::little_endian::Store32(next, i);
183187
next += 4;
184188
};
185189

190+
// id
186191
write_u32(id);
187192

193+
// timestamp
188194
absl::little_endian::Store64(next, absl::GetCurrentTimeNanos());
189195
next += 8;
190196

197+
// db_index
198+
write_u32(ci.db_index);
199+
200+
// has_more, num_parts
191201
write_u32(has_more ? 1 : 0);
192202
write_u32(uint32_t(resp.size()));
193203

194204
// Grab the lock and check if the file is still open.
195205
lock_guard lk{tl_traffic_logger.mutex};
196-
197206
if (!tl_traffic_logger.log_file)
198207
return;
199208

200-
// Proceed with writing the blob lengths.
209+
// part_len, ...
201210
for (auto part : resp) {
202211
if (size_t(next - stack_buf + 4) > sizeof(stack_buf)) {
203212
if (!tl_traffic_logger.Write(string_view{stack_buf, size_t(next - stack_buf)})) {
@@ -743,7 +752,7 @@ std::pair<std::string, std::string> Connection::GetClientInfoBeforeAfterTid() co
743752
string_view phase_name = PHASE_NAMES[phase_];
744753

745754
if (cc_) {
746-
string cc_info = service_->GetContextInfo(cc_.get());
755+
string cc_info = service_->GetContextInfo(cc_.get()).Format();
747756
if (cc_->reply_builder()->IsSendActive())
748757
phase_name = "send";
749758
absl::StrAppend(&after, " ", cc_info);
@@ -921,7 +930,7 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
921930
}
922931

923932
if (ec && !FiberSocketBase::IsConnClosed(ec)) {
924-
string conn_info = service_->GetContextInfo(cc_.get());
933+
string conn_info = service_->GetContextInfo(cc_.get()).Format();
925934
LOG(WARNING) << "Socket error for connection " << conn_info << " " << GetName()
926935
<< " during phase " << kPhaseName[phase_] << " : " << ec << " " << ec.message();
927936
}
@@ -983,10 +992,8 @@ Connection::ParserStatus Connection::ParseRedis(SinkReplyBuilder* orig_builder)
983992

984993
bool has_more = consumed < io_buf_.InputLen();
985994

986-
if (tl_traffic_logger.log_file) {
987-
if (IsMain()) { // log only on the main interface.
988-
LogTraffic(id_, has_more, absl::MakeSpan(parse_args));
989-
}
995+
if (tl_traffic_logger.log_file && IsMain() /* log only on the main interface */) {
996+
LogTraffic(id_, has_more, absl::MakeSpan(parse_args), service_->GetContextInfo(cc_.get()));
990997
}
991998
DispatchCommand(has_more, dispatch_sync, dispatch_async);
992999
}

src/facade/service_interface.cc

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Copyright 2024, DragonflyDB authors. All rights reserved.
2+
// See LICENSE for licensing terms.
3+
//
4+
5+
#include "facade/service_interface.h"
6+
7+
#include <absl/strings/str_cat.h>
8+
9+
namespace facade {
10+
11+
std::string ServiceInterface::ContextInfo::Format() const {
12+
char buf[16] = {0};
13+
std::string res = absl::StrCat("db=", db_index);
14+
15+
unsigned index = 0;
16+
17+
if (async_dispatch)
18+
buf[index++] = 'a';
19+
20+
if (conn_closing)
21+
buf[index++] = 't';
22+
23+
if (subscribers)
24+
buf[index++] = 'P';
25+
26+
if (blocked)
27+
buf[index++] = 'b';
28+
29+
if (index)
30+
absl::StrAppend(&res, " flags=", buf);
31+
return res;
32+
}
33+
34+
} // namespace facade

src/facade/service_interface.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,14 @@ class ServiceInterface {
4242
virtual void OnClose(ConnectionContext* cntx) {
4343
}
4444

45-
virtual std::string GetContextInfo(ConnectionContext* cntx) {
45+
struct ContextInfo {
46+
std::string Format() const;
47+
48+
unsigned db_index;
49+
bool async_dispatch, conn_closing, subscribers, blocked;
50+
};
51+
52+
virtual ContextInfo GetContextInfo(ConnectionContext* cntx) const {
4653
return {};
4754
}
4855
};

src/server/main_service.cc

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2539,29 +2539,13 @@ void Service::OnClose(facade::ConnectionContext* cntx) {
25392539
cntx->conn()->SetClientTrackingSwitch(false);
25402540
}
25412541

2542-
string Service::GetContextInfo(facade::ConnectionContext* cntx) {
2543-
char buf[16] = {0};
2544-
unsigned index = 0;
2542+
Service::ContextInfo Service::GetContextInfo(facade::ConnectionContext* cntx) const {
25452543
ConnectionContext* server_cntx = static_cast<ConnectionContext*>(cntx);
2546-
2547-
string res = absl::StrCat("db=", server_cntx->db_index());
2548-
2549-
if (server_cntx->async_dispatch)
2550-
buf[index++] = 'a';
2551-
2552-
if (server_cntx->conn_closing)
2553-
buf[index++] = 't';
2554-
2555-
if (server_cntx->conn_state.subscribe_info)
2556-
buf[index++] = 'P';
2557-
2558-
if (server_cntx->blocked)
2559-
buf[index++] = 'b';
2560-
2561-
if (index) {
2562-
absl::StrAppend(&res, " flags=", buf);
2563-
}
2564-
return res;
2544+
return {.db_index = server_cntx->db_index(),
2545+
.async_dispatch = server_cntx->async_dispatch,
2546+
.conn_closing = server_cntx->conn_closing,
2547+
.subscribers = bool(server_cntx->conn_state.subscribe_info),
2548+
.blocked = server_cntx->blocked};
25652549
}
25662550

25672551
using ServiceFunc = void (Service::*)(CmdArgList, ConnectionContext* cntx);

src/server/main_service.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ class Service : public facade::ServiceInterface {
9494

9595
void ConfigureHttpHandlers(util::HttpListenerBase* base, bool is_privileged) final;
9696
void OnClose(facade::ConnectionContext* cntx) final;
97-
std::string GetContextInfo(facade::ConnectionContext* cntx) final;
97+
98+
Service::ContextInfo GetContextInfo(facade::ConnectionContext* cntx) const final;
9899

99100
uint32_t shard_count() const {
100101
return shard_set->size();

tools/replay/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ var fClientBuffer = flag.Int("buffer", 100, "How many records to buffer per clie
2121
type RecordHeader struct {
2222
Client uint32
2323
Time uint64
24+
DbIndex uint32
2425
HasMore uint32
2526
}
2627

@@ -46,6 +47,7 @@ func DetermineBaseTime(files []string) time.Time {
4647
// Handles a single connection/client
4748
type ClientWorker struct {
4849
redis *redis.Client
50+
dbIndex uint32
4951
incoming chan Record
5052
}
5153

tools/replay/parsing.go

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package main
33
import (
44
"bufio"
55
"encoding/binary"
6-
"errors"
76
"io"
87
"log"
98
"os"
@@ -29,25 +28,8 @@ func parseStrings(file io.Reader) (out []interface{}, err error) {
2928

3029
for i := range out {
3130
strLen = out[i].(uint32)
32-
33-
if strLen == 0 {
34-
err = binary.Read(file, binary.LittleEndian, &strLen)
35-
if err != nil {
36-
return nil, err
37-
}
38-
39-
if strLen > 100000000 {
40-
log.Printf("Bad string length %v, index %v out of %v", strLen, i, num)
41-
for j := 0; j < i; j++ {
42-
log.Printf("Str %v %v", j, out[j])
43-
}
44-
return nil, errors.New("failed to parse a string len ")
45-
}
46-
out[i] = kBigEmptyBytes[:strLen]
47-
continue
48-
}
49-
5031
buf := make([]byte, strLen)
32+
5133
_, err := io.ReadFull(file, buf)
5234
if err != nil {
5335
return nil, err
@@ -66,6 +48,13 @@ func parseRecords(filename string, cb func(Record) bool) error {
6648
defer file.Close()
6749

6850
reader := bufio.NewReader(file)
51+
52+
var version uint8
53+
binary.Read(reader, binary.LittleEndian, &version)
54+
if version != 2 {
55+
panic("Requires version two replayer, roll back in commits!")
56+
}
57+
6958
recordNum := 0
7059
for {
7160
var rec Record

0 commit comments

Comments
 (0)