147
147
-export ([start_link /0 , start_link /2 , init /1 , handle_call /3 , handle_cast /2 ,
148
148
handle_info /2 , terminate /2 , code_change /3 , prioritise_cast /3 ]).
149
149
150
+ -export ([clear_metrics_of /1 , list_elders /0 , list_clients /0 , get_client_state /1 ]).
151
+
150
152
-define (SERVER , ? MODULE ).
151
153
% % Reserve 3 handles for ra usage: wal, segment writer and a dets table
152
154
-define (RESERVED_FOR_OTHERS , 100 + 3 ).
158
160
-define (CLIENT_ETS_TABLE , file_handle_cache_client ).
159
161
-define (ELDERS_ETS_TABLE , file_handle_cache_elders ).
160
162
163
+ -import (rabbit_misc , [safe_ets_update_counter /3 , safe_ets_update_counter /4 ,
164
+ safe_ets_update_element /3 , safe_ets_update_element /4 ]).
165
+
161
166
% %----------------------------------------------------------------------------
162
167
163
168
-record (file ,
@@ -621,6 +626,34 @@ clear_process_read_cache() ->
621
626
size (Handle # handle .read_buffer ) > 0
622
627
].
623
628
629
+ % % Only used for testing
630
+ clear_metrics_of (Pid ) ->
631
+ case whereis (? SERVER ) of
632
+ undefined -> ok ;
633
+ _ -> gen_server2 :cast (? SERVER , {clear_metrics_of , Pid })
634
+ end .
635
+
636
+ % % Only used for testing
637
+ list_elders () ->
638
+ case whereis (? SERVER ) of
639
+ undefined -> ok ;
640
+ _ -> gen_server2 :call (? SERVER , list_elders )
641
+ end .
642
+
643
+ % % Only used for testing
644
+ list_clients () ->
645
+ case whereis (? SERVER ) of
646
+ undefined -> ok ;
647
+ _ -> gen_server2 :call (? SERVER , list_clients )
648
+ end .
649
+
650
+ % % Only used for testing
651
+ get_client_state (Pid ) ->
652
+ case whereis (? SERVER ) of
653
+ undefined -> ok ;
654
+ _ -> gen_server2 :call (? SERVER , {get_client_state , Pid })
655
+ end .
656
+
624
657
% %----------------------------------------------------------------------------
625
658
% % Internal functions
626
659
% %----------------------------------------------------------------------------
@@ -1124,13 +1157,13 @@ handle_call({open, Pid, Requested, EldestUnusedSince}, From,
1124
1157
case needs_reduce (State # fhc_state { open_count = Count + Requested }) of
1125
1158
true -> case ets :lookup (Clients , Pid ) of
1126
1159
[# cstate { opened = 0 }] ->
1127
- true = ets : update_element (
1160
+ _ = safe_ets_update_element (
1128
1161
Clients , Pid , {# cstate .blocked , true }),
1129
1162
{noreply ,
1130
1163
reduce (State # fhc_state {
1131
1164
open_pending = pending_in (Item , Pending ) })};
1132
1165
[# cstate { opened = Opened }] ->
1133
- true = ets : update_element (
1166
+ _ = safe_ets_update_element (
1134
1167
Clients , Pid ,
1135
1168
{# cstate .pending_closes , Opened }),
1136
1169
{reply , close , State }
@@ -1146,7 +1179,7 @@ handle_call({obtain, N, Type, Pid}, From,
1146
1179
Item = # pending { kind = {obtain , Type }, pid = Pid ,
1147
1180
requested = N , from = From },
1148
1181
Enqueue = fun () ->
1149
- true = ets : update_element (Clients , Pid ,
1182
+ _ = safe_ets_update_element (Clients , Pid ,
1150
1183
{# cstate .blocked , true }),
1151
1184
set_obtain_state (Type , pending ,
1152
1185
pending_in (Item , Pending ), State )
@@ -1174,12 +1207,21 @@ handle_call(get_limit, _From, State = #fhc_state { limit = Limit }) ->
1174
1207
{reply , Limit , State };
1175
1208
1176
1209
handle_call ({info , Items }, _From , State ) ->
1177
- {reply , infos (Items , State ), State }.
1210
+ {reply , infos (Items , State ), State };
1211
+
1212
+ handle_call (list_elders , _From , State = # fhc_state { elders = Elders }) ->
1213
+ {reply , ets :tab2list (Elders ), State };
1214
+
1215
+ handle_call (list_clients , _From , State = # fhc_state { clients = Clients }) ->
1216
+ {reply , ets :tab2list (Clients ), State };
1217
+
1218
+ handle_call ({get_client_state , ID }, _From , State = # fhc_state { clients = Clients }) ->
1219
+ {reply , ets :lookup (Clients , ID ), State }.
1178
1220
1179
1221
handle_cast ({register_callback , Pid , MFA },
1180
1222
State = # fhc_state { clients = Clients }) ->
1181
1223
ok = track_client (Pid , Clients ),
1182
- true = ets : update_element (Clients , Pid , {# cstate .callback , MFA }),
1224
+ _ = safe_ets_update_element (Clients , Pid , {# cstate .callback , MFA }),
1183
1225
{noreply , State };
1184
1226
1185
1227
handle_cast ({update , Pid , EldestUnusedSince },
@@ -1200,7 +1242,7 @@ handle_cast({close, Pid, EldestUnusedSince},
1200
1242
undefined -> ets :delete (Elders , Pid );
1201
1243
_ -> ets :insert (Elders , {Pid , EldestUnusedSince })
1202
1244
end ,
1203
- ets : update_counter (Clients , Pid , {# cstate .pending_closes , - 1 , 0 , 0 }),
1245
+ safe_ets_update_counter (Clients , Pid , {# cstate .pending_closes , - 1 , 0 , 0 }),
1204
1246
{noreply , adjust_alarm (State , process_pending (
1205
1247
update_counts (open , Pid , - 1 , State )))};
1206
1248
@@ -1226,7 +1268,15 @@ handle_cast({set_reservation, N, Type, Pid},
1226
1268
{noreply , case needs_reduce (NewState ) of
1227
1269
true -> reduce (NewState );
1228
1270
false -> adjust_alarm (State , NewState )
1229
- end }.
1271
+ end };
1272
+
1273
+ handle_cast ({clear_metrics_of , Pid },
1274
+ State = # fhc_state { elders = Elders , clients = Clients }) ->
1275
+ ets :delete (Elders , Pid ),
1276
+ ets :delete (Clients , Pid ),
1277
+ safe_ets_update_counter (Clients , Pid , {# cstate .pending_closes , - 1 , 0 , 0 }),
1278
+ {noreply , adjust_alarm (State , process_pending (
1279
+ update_counts (open , Pid , - 1 , State )))}.
1230
1280
1231
1281
handle_info (check_counts , State ) ->
1232
1282
{noreply , maybe_reduce (State # fhc_state { timer_ref = undefined })};
@@ -1399,37 +1449,42 @@ run_pending_item(#pending { kind = Kind,
1399
1449
from = From },
1400
1450
State = # fhc_state { clients = Clients }) ->
1401
1451
gen_server2 :reply (From , ok ),
1402
- true = ets : update_element (Clients , Pid , {# cstate .blocked , false }),
1452
+ safe_ets_update_element (Clients , Pid , {# cstate .blocked , false }),
1403
1453
update_counts (Kind , Pid , Requested , State ).
1404
1454
1405
1455
update_counts (open , Pid , Delta ,
1406
1456
State = # fhc_state { open_count = OpenCount ,
1407
1457
clients = Clients }) ->
1408
- ets :update_counter (Clients , Pid , {# cstate .opened , Delta }),
1458
+ safe_ets_update_counter (Clients , Pid , {# cstate .opened , Delta },
1459
+ fun () -> rabbit_log :warning (" FHC: failed to update counter 'opened', client pid: ~p " , [Pid ]) end ),
1409
1460
State # fhc_state { open_count = OpenCount + Delta };
1410
1461
update_counts ({obtain , file }, Pid , Delta ,
1411
1462
State = # fhc_state {obtain_count_file = ObtainCountF ,
1412
1463
clients = Clients }) ->
1413
- ets :update_counter (Clients , Pid , {# cstate .obtained_file , Delta }),
1464
+ safe_ets_update_counter (Clients , Pid , {# cstate .obtained_file , Delta },
1465
+ fun () -> rabbit_log :warning (" FHC: failed to update counter 'obtained_file', client pid: ~p " , [Pid ]) end ),
1414
1466
State # fhc_state { obtain_count_file = ObtainCountF + Delta };
1415
1467
update_counts ({obtain , socket }, Pid , Delta ,
1416
1468
State = # fhc_state {obtain_count_socket = ObtainCountS ,
1417
1469
clients = Clients }) ->
1418
- ets :update_counter (Clients , Pid , {# cstate .obtained_socket , Delta }),
1470
+ safe_ets_update_counter (Clients , Pid , {# cstate .obtained_socket , Delta },
1471
+ fun () -> rabbit_log :warning (" FHC: failed to update counter 'obtained_socket', client pid: ~p " , [Pid ]) end ),
1419
1472
State # fhc_state { obtain_count_socket = ObtainCountS + Delta };
1420
1473
update_counts ({reserve , file }, Pid , NewReservation ,
1421
1474
State = # fhc_state {reserve_count_file = ReserveCountF ,
1422
1475
clients = Clients }) ->
1423
1476
[# cstate {reserved_file = R }] = ets :lookup (Clients , Pid ),
1424
1477
Delta = NewReservation - R ,
1425
- ets :update_counter (Clients , Pid , {# cstate .reserved_file , Delta }),
1478
+ safe_ets_update_counter (Clients , Pid , {# cstate .reserved_file , Delta },
1479
+ fun () -> rabbit_log :warning (" FHC: failed to update counter 'reserved_file', client pid: ~p " , [Pid ]) end ),
1426
1480
State # fhc_state { reserve_count_file = ReserveCountF + Delta };
1427
1481
update_counts ({reserve , socket }, Pid , NewReservation ,
1428
1482
State = # fhc_state {reserve_count_socket = ReserveCountS ,
1429
1483
clients = Clients }) ->
1430
1484
[# cstate {reserved_file = R }] = ets :lookup (Clients , Pid ),
1431
1485
Delta = NewReservation - R ,
1432
- ets :update_counter (Clients , Pid , {# cstate .reserved_socket , Delta }),
1486
+ safe_ets_update_counter (Clients , Pid , {# cstate .reserved_socket , Delta },
1487
+ fun () -> rabbit_log :warning (" FHC: failed to update counter 'reserved_socket', client pid: ~p " , [Pid ]) end ),
1433
1488
State # fhc_state { reserve_count_socket = ReserveCountS + Delta }.
1434
1489
1435
1490
maybe_reduce (State ) ->
@@ -1520,7 +1575,7 @@ notify(Clients, Required, [#cstate{ pid = Pid,
1520
1575
callback = {M , F , A },
1521
1576
opened = Opened } | Notifications ]) ->
1522
1577
apply (M , F , A ++ [0 ]),
1523
- ets : update_element (Clients , Pid , {# cstate .pending_closes , Opened }),
1578
+ safe_ets_update_element (Clients , Pid , {# cstate .pending_closes , Opened }),
1524
1579
notify (Clients , Required - Opened , Notifications ).
1525
1580
1526
1581
track_client (Pid , Clients ) ->
0 commit comments