@@ -1275,15 +1275,17 @@ void AppendMetricWithoutLabels(string_view name, string_view help, const absl::A
1275
1275
AppendMetricValue (name, value, {}, {}, dest);
1276
1276
}
1277
1277
1278
- void PrintPrometheusMetrics (const Metrics& m, DflyCmd* dfly_cmd, StringResponse* resp) {
1278
+ void PrintPrometheusMetrics (uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd,
1279
+ StringResponse* resp) {
1279
1280
// Server metrics
1280
1281
AppendMetricHeader (" version" , " " , MetricType::GAUGE, &resp->body ());
1281
1282
AppendMetricValue (" version" , 1 , {" version" }, {GetVersion ()}, &resp->body ());
1282
1283
1283
1284
bool is_master = ServerState::tlocal ()->is_master ;
1285
+
1284
1286
AppendMetricWithoutLabels (" master" , " 1 if master 0 if replica" , is_master ? 1 : 0 ,
1285
1287
MetricType::GAUGE, &resp->body ());
1286
- AppendMetricWithoutLabels (" uptime_in_seconds" , " " , m. uptime , MetricType::COUNTER, &resp->body ());
1288
+ AppendMetricWithoutLabels (" uptime_in_seconds" , " " , uptime, MetricType::COUNTER, &resp->body ());
1287
1289
1288
1290
// Clients metrics
1289
1291
const auto & conn_stats = m.facade_stats .conn_stats ;
@@ -1377,15 +1379,6 @@ void PrintPrometheusMetrics(const Metrics& m, DflyCmd* dfly_cmd, StringResponse*
1377
1379
if (added)
1378
1380
absl::StrAppend (&resp->body (), type_used_memory_metric);
1379
1381
}
1380
- if (!m.master_side_replicas_info .empty ()) {
1381
- ReplicationMemoryStats repl_mem;
1382
- dfly_cmd->GetReplicationMemoryStats (&repl_mem);
1383
- AppendMetricWithoutLabels (
1384
- " replication_streaming_bytes" , " Stable sync replication memory usage" ,
1385
- repl_mem.streamer_buf_capacity_bytes , MetricType::GAUGE, &resp->body ());
1386
- AppendMetricWithoutLabels (" replication_full_sync_bytes" , " Full sync memory usage" ,
1387
- repl_mem.full_sync_buf_bytes , MetricType::GAUGE, &resp->body ());
1388
- }
1389
1382
1390
1383
// Stats metrics
1391
1384
AppendMetricWithoutLabels (" connections_received_total" , " " , conn_stats.conn_received_cnt ,
@@ -1458,11 +1451,25 @@ void PrintPrometheusMetrics(const Metrics& m, DflyCmd* dfly_cmd, StringResponse*
1458
1451
absl::StrAppend (&resp->body (), command_metrics);
1459
1452
}
1460
1453
1461
- if (!m.master_side_replicas_info .empty ()) {
1454
+ if (m.replica_side_info ) { // slave side
1455
+ auto & replica_info = *m.replica_side_info ;
1456
+ AppendMetricWithoutLabels (" replica_reconnect_count" , " Number of replica reconnects" ,
1457
+ replica_info.reconnect_count , MetricType::COUNTER, &resp->body ());
1458
+ } else { // Master side
1462
1459
string replication_lag_metrics;
1460
+ vector<ReplicaRoleInfo> replicas_info = dfly_cmd->GetReplicasRoleInfo ();
1461
+
1462
+ ReplicationMemoryStats repl_mem;
1463
+ dfly_cmd->GetReplicationMemoryStats (&repl_mem);
1464
+ AppendMetricWithoutLabels (
1465
+ " replication_streaming_bytes" , " Stable sync replication memory usage" ,
1466
+ repl_mem.streamer_buf_capacity_bytes , MetricType::GAUGE, &resp->body ());
1467
+ AppendMetricWithoutLabels (" replication_full_sync_bytes" , " Full sync memory usage" ,
1468
+ repl_mem.full_sync_buf_bytes , MetricType::GAUGE, &resp->body ());
1469
+
1463
1470
AppendMetricHeader (" connected_replica_lag_records" , " Lag in records of a connected replica." ,
1464
1471
MetricType::GAUGE, &replication_lag_metrics);
1465
- for (const auto & replica : m. master_side_replicas_info ) {
1472
+ for (const auto & replica : replicas_info ) {
1466
1473
AppendMetricValue (" connected_replica_lag_records" , replica.lsn_lag ,
1467
1474
{" replica_ip" , " replica_port" , " replica_state" },
1468
1475
{replica.address , absl::StrCat (replica.listening_port ), replica.state },
@@ -1471,12 +1478,6 @@ void PrintPrometheusMetrics(const Metrics& m, DflyCmd* dfly_cmd, StringResponse*
1471
1478
absl::StrAppend (&resp->body (), replication_lag_metrics);
1472
1479
}
1473
1480
1474
- if (m.replica_side_info ) {
1475
- auto & replica_info = *m.replica_side_info ;
1476
- AppendMetricWithoutLabels (" replica_reconnect_count" , " Number of replica reconnects" ,
1477
- replica_info.reconnect_count , MetricType::COUNTER, &resp->body ());
1478
- }
1479
-
1480
1481
AppendMetricWithoutLabels (" fiber_switch_total" , " " , m.fiber_switch_cnt , MetricType::COUNTER,
1481
1482
&resp->body ());
1482
1483
double delay_seconds = m.fiber_switch_delay_usec * 1e-6 ;
@@ -1533,7 +1534,8 @@ void ServerFamily::ConfigureMetrics(util::HttpListenerBase* http_base) {
1533
1534
1534
1535
auto cb = [this ](const util::http::QueryArgs& args, util::HttpContext* send) {
1535
1536
StringResponse resp = util::http::MakeStringResponse (boost::beast::http::status::ok);
1536
- PrintPrometheusMetrics (this ->GetMetrics (&namespaces->GetDefaultNamespace ()),
1537
+ uint64_t uptime = time (NULL ) - start_time_;
1538
+ PrintPrometheusMetrics (uptime, this ->GetMetrics (&namespaces->GetDefaultNamespace ()),
1537
1539
this ->dfly_cmd_ .get (), &resp);
1538
1540
1539
1541
return send->Invoke (std::move (resp));
@@ -1607,14 +1609,17 @@ void ServerFamily::StatsMC(std::string_view section, SinkReplyBuilder* builder)
1607
1609
1608
1610
double utime = dbl_time (ru.ru_utime );
1609
1611
double systime = dbl_time (ru.ru_stime );
1612
+ auto kind = ProactorBase::me ()->GetKind ();
1613
+ const char * multiplex_api = (kind == ProactorBase::IOURING) ? " iouring" : " epoll" ;
1610
1614
1611
1615
Metrics m = GetMetrics (&namespaces->GetDefaultNamespace ());
1616
+ uint64_t uptime = time (NULL ) - start_time_;
1612
1617
1613
1618
ADD_LINE (pid, getpid ());
1614
- ADD_LINE (uptime, m. uptime );
1619
+ ADD_LINE (uptime, uptime);
1615
1620
ADD_LINE (time, now);
1616
1621
ADD_LINE (version, kGitTag );
1617
- ADD_LINE (libevent, " iouring " );
1622
+ ADD_LINE (libevent, multiplex_api );
1618
1623
ADD_LINE (pointer_size, sizeof (void *));
1619
1624
ADD_LINE (rusage_user, utime);
1620
1625
ADD_LINE (rusage_system, systime);
@@ -2083,7 +2088,7 @@ static void MergeDbSliceStats(const DbSlice::Stats& src, Metrics* dest) {
2083
2088
2084
2089
void ServerFamily::ResetStat (Namespace* ns) {
2085
2090
shard_set->pool ()->AwaitBrief (
2086
- [registry = service_.mutable_registry (), this , ns](unsigned index, auto *) {
2091
+ [registry = service_.mutable_registry (), ns](unsigned index, auto *) {
2087
2092
registry->ResetCallStats (index);
2088
2093
ns->GetCurrentDbSlice ().ResetEvents ();
2089
2094
facade::ResetStats ();
@@ -2095,6 +2100,8 @@ Metrics ServerFamily::GetMetrics(Namespace* ns) const {
2095
2100
Metrics result;
2096
2101
util::fb2::Mutex mu;
2097
2102
2103
+ uint64_t start = absl::GetCurrentTimeNanos ();
2104
+
2098
2105
auto cmd_stat_cb = [&dest = result.cmd_stats_map ](string_view name, const CmdCallStats& stat) {
2099
2106
auto & [calls, sum] = dest[absl::AsciiStrToLower (name)];
2100
2107
calls += stat.first ;
@@ -2117,7 +2124,6 @@ Metrics ServerFamily::GetMetrics(Namespace* ns) const {
2117
2124
2118
2125
result.coordinator_stats .Add (ss->stats );
2119
2126
2120
- result.uptime = time (NULL ) - this ->start_time_ ;
2121
2127
result.qps += uint64_t (ss->MovingSum6 ());
2122
2128
result.facade_stats += *tl_facade_stats;
2123
2129
result.serialization_bytes += SliceSnapshot::GetThreadLocalMemoryUsage ();
@@ -2156,15 +2162,16 @@ Metrics ServerFamily::GetMetrics(Namespace* ns) const {
2156
2162
2157
2163
service_.proactor_pool ().AwaitFiberOnAll (std::move (cb));
2158
2164
2165
+ uint64_t after_cb = absl::GetCurrentTimeNanos ();
2166
+
2159
2167
// Normalize moving average stats
2160
2168
result.qps /= 6 ;
2161
2169
result.traverse_ttl_per_sec /= 6 ;
2162
2170
result.delete_ttl_per_sec /= 6 ;
2163
2171
2164
2172
bool is_master = ServerState::tlocal () && ServerState::tlocal ()->is_master ;
2165
- if (is_master) {
2166
- result.master_side_replicas_info = dfly_cmd_->GetReplicasRoleInfo ();
2167
- } else {
2173
+
2174
+ if (!is_master) {
2168
2175
auto info = GetReplicaSummary ();
2169
2176
if (info) {
2170
2177
result.replica_side_info = {
@@ -2187,6 +2194,12 @@ Metrics ServerFamily::GetMetrics(Namespace* ns) const {
2187
2194
2188
2195
result.peak_stats = peak_stats_;
2189
2196
2197
+ uint64_t delta_ms = (absl::GetCurrentTimeNanos () - start) / 1'000'000 ;
2198
+ if (delta_ms > 30 ) {
2199
+ uint64_t cb_dur = (after_cb - start) / 1'000'000 ;
2200
+ LOG (INFO) << " GetMetrics took " << delta_ms << " ms, out of which callback took " << cb_dur
2201
+ << " ms" ;
2202
+ }
2190
2203
return result;
2191
2204
}
2192
2205
@@ -2217,15 +2230,6 @@ void ServerFamily::Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
2217
2230
absl::StrAppend (&info, a1, " :" , a2, " \r\n " );
2218
2231
};
2219
2232
2220
- uint64_t start = absl::GetCurrentTimeNanos ();
2221
- Metrics m = GetMetrics (cntx->ns );
2222
- uint64_t delta_ms = (absl::GetCurrentTimeNanos () - start) / 1000'000 ;
2223
- LOG_IF (INFO, delta_ms > 100 ) << " GetMetrics took " << delta_ms << " ms" ;
2224
-
2225
- DbStats total;
2226
- for (const auto & db_stats : m.db_stats )
2227
- total += db_stats;
2228
-
2229
2233
if (should_enter (" SERVER" )) {
2230
2234
auto kind = ProactorBase::me ()->GetKind ();
2231
2235
const char * multiplex_api = (kind == ProactorBase::IOURING) ? " iouring" : " epoll" ;
@@ -2238,11 +2242,22 @@ void ServerFamily::Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
2238
2242
append (" multiplexing_api" , multiplex_api);
2239
2243
append (" tcp_port" , GetFlag (FLAGS_port));
2240
2244
append (" thread_count" , service_.proactor_pool ().size ());
2241
- size_t uptime = m.uptime ;
2245
+
2246
+ uint64_t uptime = time (NULL ) - start_time_;
2242
2247
append (" uptime_in_seconds" , uptime);
2243
2248
append (" uptime_in_days" , uptime / (3600 * 24 ));
2244
2249
}
2245
2250
2251
+ Metrics m;
2252
+ // Save time by not calculating metrics if we don't need them.
2253
+ if (!(section == " SERVER" || section == " REPLICATION" )) {
2254
+ m = GetMetrics (cntx->ns );
2255
+ }
2256
+
2257
+ DbStats total;
2258
+ for (const auto & db_stats : m.db_stats )
2259
+ total += db_stats;
2260
+
2246
2261
if (should_enter (" CLIENTS" )) {
2247
2262
append (" connected_clients" , m.facade_stats .conn_stats .num_conns );
2248
2263
append (" max_clients" , GetFlag (FLAGS_maxclients));
@@ -2310,7 +2325,7 @@ void ServerFamily::Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
2310
2325
append (" maxmemory_policy" , " noeviction" );
2311
2326
}
2312
2327
2313
- if (!m.master_side_replicas_info . empty ()) {
2328
+ if (!m.replica_side_info ) { // master
2314
2329
ReplicationMemoryStats repl_mem;
2315
2330
dfly_cmd_->GetReplicationMemoryStats (&repl_mem);
2316
2331
append (" replication_streaming_buffer_bytes" , repl_mem.streamer_buf_capacity_bytes );
@@ -2472,17 +2487,23 @@ void ServerFamily::Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
2472
2487
}
2473
2488
2474
2489
if (should_enter (" REPLICATION" )) {
2475
- util::fb2::LockGuard lk (replicaof_mu_) ;
2490
+ bool is_master = true ;
2476
2491
// Thread local var is_master is updated under mutex replicaof_mu_ together with replica_,
2477
2492
// ensuring eventual consistency of is_master. When determining if the server is a replica and
2478
2493
// accessing the replica_ object, we must lock replicaof_mu_. Using is_master alone is
2479
2494
// insufficient in this scenario.
2480
- if (!replica_) {
2495
+ // Please note that we we do not use Metrics object here.
2496
+ {
2497
+ fb2::LockGuard lk (replicaof_mu_);
2498
+ is_master = !replica_;
2499
+ }
2500
+ if (is_master) {
2501
+ vector<ReplicaRoleInfo> replicas_info = dfly_cmd_->GetReplicasRoleInfo ();
2481
2502
append (" role" , " master" );
2482
- append (" connected_slaves" , m. facade_stats . conn_stats . num_replicas );
2483
- const auto & replicas = m. master_side_replicas_info ;
2484
- for (size_t i = 0 ; i < replicas .size (); i++) {
2485
- auto & r = replicas [i];
2503
+ append (" connected_slaves" , replicas_info. size () );
2504
+
2505
+ for (size_t i = 0 ; i < replicas_info .size (); i++) {
2506
+ auto & r = replicas_info [i];
2486
2507
// e.g. slave0:ip=172.19.0.3,port=6379,state=full_sync
2487
2508
append (StrCat (" slave" , i), StrCat (" ip=" , r.address , " ,port=" , r.listening_port ,
2488
2509
" ,state=" , r.state , " ,lag=" , r.lsn_lag ));
@@ -2505,6 +2526,8 @@ void ServerFamily::Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
2505
2526
append (" slave_priority" , GetFlag (FLAGS_replica_priority));
2506
2527
append (" slave_read_only" , 1 );
2507
2528
};
2529
+ fb2::LockGuard lk (replicaof_mu_);
2530
+
2508
2531
replication_info_cb (replica_->GetSummary ());
2509
2532
2510
2533
// Special case, when multiple masters replicate to a single replica.
@@ -2895,9 +2918,6 @@ void ServerFamily::ReplConf(CmdArgList args, Transaction* tx, SinkReplyBuilder*
2895
2918
string sync_id = absl::StrCat (" SYNC" , sid);
2896
2919
cntx->conn_state .replication_info .repl_session_id = sid;
2897
2920
2898
- if (!cntx->replica_conn ) {
2899
- ServerState::tl_connection_stats ()->num_replicas += 1 ;
2900
- }
2901
2921
cntx->replica_conn = true ;
2902
2922
2903
2923
// The response for 'capa dragonfly' is: <masterid> <syncid> <numthreads> <version>
0 commit comments