@@ -1006,26 +1006,25 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, SinkReplyBuilder* builder) {
1006
1006
}
1007
1007
1008
1008
void ClusterFamily::BreakStalledFlowsInShard () {
1009
- std::unique_lock global_lock (migration_mu_, std::defer_lock);
1010
-
1011
1009
// give up on blocking because we run this function periodically in a background fiber,
1012
1010
// so it will eventually grab the lock.
1013
- if (!global_lock.try_lock ())
1014
- return ;
1015
-
1016
- int64_t timeout_ns = int64_t (absl::GetFlag (FLAGS_migration_timeout)) * 1'000'000LL ;
1017
- for (auto & om : outgoing_migration_jobs_) {
1018
- if (om->GetState () == MigrationState::C_FINISHED)
1019
- continue ;
1020
-
1021
- int64_t now = absl::GetCurrentTimeNanos ();
1022
- int64_t last_write_ns = om->GetShardLastWriteTime ();
1023
-
1024
- if (last_write_ns > 0 && last_write_ns + timeout_ns < now) {
1025
- LOG (WARNING) << " Source node detected migration timeout for: "
1026
- << om->GetMigrationInfo ().ToString ()
1027
- << " last_write_ms: " << last_write_ns / 1000'000 << " , now: " << now / 1000'000 ;
1028
- om->Finish (true , " Detected migration timeout" );
1011
+ if (migration_mu_.try_lock ()) {
1012
+ std::lock_guard lock (migration_mu_, std::adopt_lock);
1013
+ int64_t timeout_ns = int64_t (absl::GetFlag (FLAGS_migration_timeout)) * 1'000'000LL ;
1014
+ for (auto & om : outgoing_migration_jobs_) {
1015
+ if (om->GetState () == MigrationState::C_FINISHED)
1016
+ continue ;
1017
+
1018
+ int64_t now = absl::GetCurrentTimeNanos ();
1019
+ int64_t last_write_ns = om->GetShardLastWriteTime ();
1020
+
1021
+ if (last_write_ns > 0 && last_write_ns + timeout_ns < now) {
1022
+ LOG (WARNING) << " Source node detected migration timeout for: "
1023
+ << om->GetMigrationInfo ().ToString ()
1024
+ << " last_write_ms: " << last_write_ns / 1000'000
1025
+ << " , now: " << now / 1000'000 ;
1026
+ om->Finish (true , " Detected migration timeout" );
1027
+ }
1029
1028
}
1030
1029
}
1031
1030
}
0 commit comments