Skip to content

Binary cache: async push_success #908

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
95f0438
Binary cache: async push_success
autoantwort Feb 15, 2023
9d999d8
Merge branch 'main' into feature/async-binary-cache-push-success
autoantwort Feb 28, 2023
163d9cd
Merge branch 'main' into feature/async-binary-cache-push-success
autoantwort Mar 2, 2023
2a54205
Apply suggestions from code review
autoantwort Mar 2, 2023
0912655
Adapt code review
autoantwort Mar 2, 2023
5d7288c
Update src/vcpkg/binarycaching.cpp
autoantwort Mar 2, 2023
10189ac
Adapt code review
autoantwort Mar 2, 2023
2567607
Remove unnecessary actions_to_push_notifier.notify_all()
autoantwort Mar 2, 2023
ecdd000
Prevent deadlock and don't be on the crtl+c path
autoantwort Mar 2, 2023
8e7ae61
Add and use BGMessageSink to print IBinaryProvider::push_success mess…
autoantwort Mar 3, 2023
850d7c9
Restore old upload message
autoantwort Mar 3, 2023
548be38
Don't join yourself
autoantwort Mar 4, 2023
6dbbf06
Print messages about remaining packages to upload
autoantwort Mar 4, 2023
74b86fd
Localization
autoantwort Mar 5, 2023
5171d3e
Improve messages
autoantwort Mar 5, 2023
d69ed8f
No singleton and explicit calls to wait_for_async_complete()
autoantwort Mar 5, 2023
2df42d5
Merge branch 'main' into feature/async-binary-cache-push-success
autoantwort Mar 8, 2023
5f1786e
Merge branch 'main' into feature/async-binary-cache-push-success
autoantwort Mar 10, 2023
93303c3
Merge branch 'main' into feature/async-binary-cache-push-success
autoantwort Mar 16, 2023
8a26c8b
Merge branch 'main' into feature/async-binary-cache-push-success
autoantwort Mar 19, 2023
aa7e52f
Merge branch 'main' into feature/async-binary-cache-push-success
autoantwort Mar 22, 2023
d46a4d6
Apply code review
autoantwort Mar 22, 2023
5e51718
Trigger Build
autoantwort Mar 22, 2023
a9ac558
No rename dance
autoantwort Mar 22, 2023
4faf674
Print upload to provider only once and not once per provider
autoantwort Mar 22, 2023
b9be8c6
Fix tests
autoantwort Mar 22, 2023
78ca081
Don't create unnecessary strings
autoantwort Mar 31, 2023
579bfa9
Rename to m_published_lock
autoantwort Mar 31, 2023
103968e
BinaryPackageInformation use Optional and make BinaryProviderPushRequ…
autoantwort Mar 31, 2023
dd32416
Merge branch 'main' into feature/async-binary-cache-push-success and …
autoantwort May 31, 2023
b666f94
Add missing files
autoantwort May 31, 2023
15bb503
Add missing includes
autoantwort May 31, 2023
d995bfd
Make BianryCache a unique_ptr
autoantwort May 31, 2023
24cd026
Reduce changes
autoantwort May 31, 2023
92fc76b
Fix output
autoantwort May 31, 2023
3527227
Fix bug
autoantwort May 31, 2023
48305b3
Format
autoantwort May 31, 2023
27fa076
Use lock_guard
autoantwort May 31, 2023
bcd459a
Revert "Use lock_guard"
autoantwort May 31, 2023
f958d36
Use enum
autoantwort May 31, 2023
7a24007
BGMessageSink::print_published apply code review
autoantwort May 31, 2023
50114f9
Merge branch 'main' into feature/async-binary-cache-push-success
autoantwort Jun 14, 2023
ca5f2b1
Merge branch 'main' into feature/async-binary-cache-push-success
autoantwort Aug 19, 2023
eccd9ee
Fix typo
autoantwort Aug 24, 2023
e7837e0
Fix typo in file name
autoantwort Aug 24, 2023
969e7fc
Merge branch 'main' into feature/async-binary-cache-push-success
autoantwort Aug 24, 2023
2d5586f
Merge branch 'main' into feature/async-binary-cache-push-success
autoantwort Oct 11, 2023
809d0b6
Renamings
autoantwort Oct 11, 2023
455e29b
format
autoantwort Oct 12, 2023
03fdfea
Merge branch 'main' into feature/async-binary-cache-push-success
autoantwort Nov 4, 2023
f4bad8c
BinaryCache and std::unique_ptr
autoantwort Nov 4, 2023
26bbbd5
Merge branch 'main' into feature/async-binary-cache-push-success
autoantwort Nov 12, 2023
814e434
BinaryCache: save data in std::unique_ptr so that the object can be m…
autoantwort Nov 14, 2023
290e586
fix
autoantwort Nov 14, 2023
3cc3378
Merge branch 'main' into feature/async-binary-cache-push-success
autoantwort Dec 13, 2023
978ceae
Merge branch 'main' into feature/async-binary-cache-push-success
autoantwort Nov 18, 2024
47b56ce
Merge branch 'main' into feature/async-binary-cache-push-success
autoantwort Dec 25, 2024
061e6e8
Merge branch 'main' into feature/async-binary-cache-push-success
autoantwort Jan 14, 2025
050c51f
Merge remote-tracking branch 'origin/main' into feature/async-binary-…
BillyONeal Jan 15, 2025
8182732
Merge remote-tracking branch 'origin/main' into feature/async-binary-…
BillyONeal Jan 24, 2025
139c7da
Change find_last test to something that find (forward) won't pass.
BillyONeal Dec 9, 2024
4f410f1
Collapse the background work queue system to handle completion of the…
BillyONeal Dec 9, 2024
73b693a
Change BinaryCache and ZipTool's interface to avoid needing to copy o…
BillyONeal Jan 31, 2025
54fe17f
Merge remote-tracking branch 'BillyONeal/contextize-ziptool' into fea…
BillyONeal Jan 31, 2025
27780f9
Fixed upload status being printed to the terminal without synchroniza…
BillyONeal Jan 31, 2025
8e15cf4
Use any_of, put a member FileSystem& back into BinaryCache, make sure…
BillyONeal Jan 31, 2025
21de05a
* Combine submitted/completed counts into one atomic.
BillyONeal Jan 31, 2025
8f372b2
Change submission count message slightly and avoid printing from BG t…
BillyONeal Feb 3, 2025
8ecfdaa
Make the message count pettier.
BillyONeal Feb 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/vcpkg/base/fwd/message_sinks.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ namespace vcpkg

struct FileSink;
struct CombiningSink;
struct BGMessageSink;
}
7 changes: 2 additions & 5 deletions include/vcpkg/base/message-data.inc.h
Original file line number Diff line number Diff line change
Expand Up @@ -2276,6 +2276,7 @@ DECLARE_MESSAGE(SpecifyTargetArch,
"Specify the target architecture triplet. See 'vcpkg help triplet'.\n(default: '{env_var}')")
DECLARE_MESSAGE(StartCodeUnitInContinue, (), "", "found start code unit in continue position")
DECLARE_MESSAGE(StoredBinaryCache, (msg::path), "", "Stored binary cache: \"{path}\"")
DECLARE_MESSAGE(StoredBinariesToDestinations, (msg::count), "", "Stored binaries in {count} destinations.")
DECLARE_MESSAGE(StoreOptionMissingSha, (), "", "--store option is invalid without a sha512")
DECLARE_MESSAGE(SuccessfulyExported, (msg::package_name, msg::path), "", "Exported {package_name} to {path}")
DECLARE_MESSAGE(SuggestGitPull, (), "", "The result may be outdated. Run `git pull` to get the latest results.")
Expand Down Expand Up @@ -2527,11 +2528,6 @@ DECLARE_MESSAGE(
(),
"",
"If you are sure you want to rebuild the above packages, run this command with the --no-dry-run option.")
DECLARE_MESSAGE(UploadedBinaries, (msg::count, msg::vendor), "", "Uploaded binaries to {count} {vendor}.")
DECLARE_MESSAGE(UploadedPackagesToVendor,
(msg::count, msg::elapsed, msg::vendor),
"",
"Uploaded {count} package(s) to {vendor} in {elapsed}")
DECLARE_MESSAGE(UploadingBinariesToVendor,
(msg::spec, msg::vendor, msg::path),
"",
Expand Down Expand Up @@ -2765,6 +2761,7 @@ DECLARE_MESSAGE(VSExaminedPaths, (), "", "The following paths were examined for
DECLARE_MESSAGE(VSNoInstances, (), "", "Could not locate a complete Visual Studio instance")
DECLARE_MESSAGE(WaitingForChildrenToExit, (), "", "Waiting for child processes to exit...")
DECLARE_MESSAGE(WaitingToTakeFilesystemLock, (msg::path), "", "waiting to take filesystem lock on {path}...")
DECLARE_MESSAGE(WaitUntilPackagesUploaded, (msg::count), "", "Wait until the remaining packages ({count}) are uploaded")
DECLARE_MESSAGE(WarningMessage, (), "", "warning: ")
DECLARE_MESSAGE(WarningMessageMustUsePrintWarning,
(msg::value),
Expand Down
29 changes: 29 additions & 0 deletions include/vcpkg/base/message_sinks.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include <vcpkg/base/files.h>
#include <vcpkg/base/messages.h>

#include <mutex>

namespace vcpkg
{

Expand Down Expand Up @@ -87,4 +89,31 @@ namespace vcpkg
CombiningSink(MessageSink& first, MessageSink& second) : m_first(first), m_second(second) { }
void print(Color c, StringView sv) override;
};

struct BGMessageSink : MessageSink
{
BGMessageSink(MessageSink& out_sink) : out_sink(out_sink) { }
~BGMessageSink() { publish_directly_to_out_sink(); }
// must be called from producer
void print(Color c, StringView sv) override;
using MessageSink::print;

// must be called from consumer (synchronizer of out)
void print_published();

void publish_directly_to_out_sink();

private:
MessageSink& out_sink;

std::mutex m_lock;
// guarded by m_lock
std::vector<std::pair<Color, std::string>> m_published;
// buffers messages until newline is reached
// guarded by m_print_directly_lock
std::vector<std::pair<Color, std::string>> m_unpublished;

std::mutex m_print_directly_lock;
bool m_print_directly_to_out_sink = false;
};
}
57 changes: 53 additions & 4 deletions include/vcpkg/binarycaching.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,17 @@
#include <vcpkg/base/downloads.h>
#include <vcpkg/base/expected.h>
#include <vcpkg/base/files.h>
#include <vcpkg/base/message_sinks.h>

#include <vcpkg/packagespec.h>
#include <vcpkg/sourceparagraph.h>

#include <condition_variable>
#include <iterator>
#include <queue>
#include <set>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>

Expand Down Expand Up @@ -42,6 +47,25 @@ namespace vcpkg
const IBinaryProvider* m_available_provider = nullptr; // meaningful iff m_status == available
};

struct BinaryPackageInformation
{
explicit BinaryPackageInformation(const InstallPlanAction& action, std::string&& nuspec = "");
std::string package_abi;
PackageSpec spec;
std::string raw_version;
std::string nuspec; // only filled if BinaryCache has a provider that returns true for needs_nuspec_data()
};

struct BinaryProviderPushRequest
{
BinaryProviderPushRequest(BinaryPackageInformation&& info, Path package_dir)
: info(std::move(info)), package_dir(std::move(package_dir))
{
}
BinaryPackageInformation info;
Path package_dir;
};

struct IBinaryProvider
{
virtual ~IBinaryProvider() = default;
Expand All @@ -52,7 +76,8 @@ namespace vcpkg

/// Called upon a successful build of `action` to store those contents in the binary cache.
/// Prerequisite: action has a package_abi()
virtual void push_success(const InstallPlanAction& action) const = 0;
/// returns the number of successful uploads
virtual int push_success(const BinaryProviderPushRequest& request, MessageSink& msg_sink) = 0;

/// Gives the IBinaryProvider an opportunity to batch any downloading or server communication for
/// executing `actions`.
Expand All @@ -68,6 +93,8 @@ namespace vcpkg
/// to the action at the same index in `actions`. The provider must mark the cache status as appropriate.
/// Prerequisite: `actions` have package ABIs.
virtual void precheck(View<InstallPlanAction> actions, View<CacheStatus*> cache_status) const = 0;

virtual bool needs_nuspec_data() const { return false; }
};

struct UrlTemplate
Expand All @@ -77,7 +104,7 @@ namespace vcpkg
std::vector<std::string> headers_for_get;

LocalizedString valid() const;
std::string instantiate_variables(const InstallPlanAction& action) const;
std::string instantiate_variables(const BinaryPackageInformation& info) const;
};

struct BinaryConfigParserState
Expand Down Expand Up @@ -124,17 +151,21 @@ namespace vcpkg

struct BinaryCache
{
BinaryCache() = default;
BinaryCache(Filesystem& filesystem);
explicit BinaryCache(const VcpkgCmdArguments& args, const VcpkgPaths& paths);

~BinaryCache();

void install_providers(std::vector<std::unique_ptr<IBinaryProvider>>&& providers);
void install_providers_for(const VcpkgCmdArguments& args, const VcpkgPaths& paths);

/// Attempts to restore the package referenced by `action` into the packages directory.
RestoreResult try_restore(const InstallPlanAction& action);

/// Called upon a successful build of `action` to store those contents in the binary cache.
void push_success(const InstallPlanAction& action);
void push_success(const InstallPlanAction& action, Path package_dir);

void print_push_success_messages();

/// Gives the IBinaryProvider an opportunity to batch any downloading or server communication for
/// executing `actions`.
Expand All @@ -145,9 +176,27 @@ namespace vcpkg
/// Returns a vector where each index corresponds to the matching index in `actions`.
std::vector<CacheAvailability> precheck(View<InstallPlanAction> actions);

void wait_for_async_complete();

private:
struct ActionToPush
{
BinaryProviderPushRequest request;
bool clean_after_push = false;
};
void push_thread_main();

BGMessageSink bg_msg_sink;
std::unordered_map<std::string, CacheStatus> m_status;
std::vector<std::unique_ptr<IBinaryProvider>> m_providers;
bool needs_nuspec_data = false;
std::condition_variable actions_to_push_notifier;
std::mutex actions_to_push_mutex;
std::vector<ActionToPush> actions_to_push;
std::thread push_thread;
std::atomic_bool end_push_thread;
std::atomic_int remaining_packages_to_push = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A rule of thumb is std::condition_variable + std::atomic == deadlock, which is the case here. Given threads FG and BG, the following execution is possible:

BG: actions_to_push_notifier.wait(pred)
BG: runs pred(), end_push_thread.load(), returns false
FG: end_push_thread.store(true)
FG: actions_to_push_notifier.notify_all()
BG: go to sleep on the CV
FG: BG.join()

result: Deadlock

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also if atomic is kept I prefer atomic<bool> and atomic<int> over the C compat hack atomic_bool and atomic_int.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like remaining_packages_to_push should be the sum of the vector sizes rather than tracked as a separate atomic var?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! The documentation at cppreference also mentions that in the first block 🤦 .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not exactly the sum. It is the size of one vector + the remaining elements of a second vector that is currently processed. (And the second vector is currently not available where remaining_packages_to_push is available)

Filesystem& filesystem;
};

ExpectedL<DownloadManagerConfig> parse_download_configuration(const Optional<std::string>& arg);
Expand Down
5 changes: 5 additions & 0 deletions include/vcpkg/binarycaching.private.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <vcpkg/base/strings.h>

#include <vcpkg/binarycaching.h>
#include <vcpkg/dependencies.h>

namespace vcpkg
Expand Down Expand Up @@ -35,6 +36,10 @@ namespace vcpkg
{
return {Strings::concat(prefix, spec.dir()), format_version_for_nugetref(raw_version, abi_tag)};
}
inline NugetReference make_nugetref(const BinaryPackageInformation& info, const std::string& prefix)
{
return make_nugetref(info.spec, info.raw_version, info.package_abi, prefix);
}
inline NugetReference make_nugetref(const InstallPlanAction& action, const std::string& prefix)
{
return make_nugetref(action.spec,
Expand Down
1 change: 0 additions & 1 deletion include/vcpkg/build.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ namespace vcpkg
ExtendedBuildResult build_package(const VcpkgCmdArguments& args,
const VcpkgPaths& paths,
const InstallPlanAction& config,
BinaryCache& binary_cache,
const IBuildLogsRecorder& build_logs_recorder,
const StatusParagraphs& status_db);

Expand Down
1 change: 1 addition & 0 deletions include/vcpkg/fwd/binarycaching.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ namespace vcpkg
struct IBinaryProvider;
struct BinaryCache;
struct BinaryConfigParserState;
struct BinaryProviderPushRequest;
}
8 changes: 4 additions & 4 deletions locales/messages.json
Original file line number Diff line number Diff line change
Expand Up @@ -1266,6 +1266,8 @@
"_SpecifyTargetArch.comment": "An example of {env_var} is VCPKG_DEFAULT_TRIPLET.",
"StartCodeUnitInContinue": "found start code unit in continue position",
"StoreOptionMissingSha": "--store option is invalid without a sha512",
"StoredBinariesToDestinations": "Stored binaries in {count} destinations.",
"_StoredBinariesToDestinations.comment": "An example of {count} is 42.",
"StoredBinaryCache": "Stored binary cache: \"{path}\"",
"_StoredBinaryCache.comment": "An example of {path} is /foo/bar.",
"SuccessfulyExported": "Exported {package_name} to {path}",
Expand Down Expand Up @@ -1410,10 +1412,6 @@
"_UpdateBaselineUpdatedBaseline.comment": "example of {old_value}, {new_value} is '5507daa796359fe8d45418e694328e878ac2b82f' An example of {url} is https://github.com/microsoft/vcpkg.",
"UpgradeInManifest": "The upgrade command does not currently support manifest mode. Instead, modify your vcpkg.json and run install.",
"UpgradeRunWithNoDryRun": "If you are sure you want to rebuild the above packages, run this command with the --no-dry-run option.",
"UploadedBinaries": "Uploaded binaries to {count} {vendor}.",
"_UploadedBinaries.comment": "An example of {count} is 42. An example of {vendor} is Azure.",
"UploadedPackagesToVendor": "Uploaded {count} package(s) to {vendor} in {elapsed}",
"_UploadedPackagesToVendor.comment": "An example of {count} is 42. An example of {elapsed} is 3.532 min. An example of {vendor} is Azure.",
"UploadingBinariesToVendor": "Uploading binaries for '{spec}' to '{vendor}' source \"{path}\".",
"_UploadingBinariesToVendor.comment": "An example of {spec} is zlib:x64-windows. An example of {vendor} is Azure. An example of {path} is /foo/bar.",
"UploadingBinariesUsingVendor": "Uploading binaries for '{spec}' using '{vendor}' \"{path}\".",
Expand Down Expand Up @@ -1512,6 +1510,8 @@
"VersionTableHeader": "Version",
"VersionVerifiedOK": "OK: {package_name}@{version} -> {commit_sha}",
"_VersionVerifiedOK.comment": "An example of {package_name} is zlib. An example of {version} is 1.3.8. An example of {commit_sha} is 7cfad47ae9f68b183983090afd6337cd60fd4949.",
"WaitUntilPackagesUploaded": "Wait until the remaining packages ({count}) are uploaded",
"_WaitUntilPackagesUploaded.comment": "An example of {count} is 42.",
"WaitingForChildrenToExit": "Waiting for child processes to exit...",
"WaitingToTakeFilesystemLock": "waiting to take filesystem lock on {path}...",
"_WaitingToTakeFilesystemLock.comment": "An example of {path} is /foo/bar.",
Expand Down
14 changes: 9 additions & 5 deletions src/vcpkg-test/binarycaching.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,21 @@ struct KnowNothingBinaryProvider : IBinaryProvider
return RestoreResult::unavailable;
}

virtual void push_success(const InstallPlanAction& action) const override { CHECK(action.has_package_abi()); }
int push_success(const BinaryProviderPushRequest& request, MessageSink&) override
{
CHECK_FALSE(request.info.package_abi.empty());
return 0;
}

virtual void prefetch(View<InstallPlanAction> actions, View<CacheStatus* const> cache_status) const override
void prefetch(View<InstallPlanAction> actions, View<CacheStatus* const> cache_status) const override
{
REQUIRE(actions.size() == cache_status.size());
for (size_t idx = 0; idx < cache_status.size(); ++idx)
{
CHECK(actions[idx].has_package_abi() == (cache_status[idx] != nullptr));
}
}
virtual void precheck(View<InstallPlanAction> actions, View<CacheStatus* const> cache_status) const override
void precheck(View<InstallPlanAction> actions, View<CacheStatus* const> cache_status) const override
{
REQUIRE(actions.size() == cache_status.size());
for (const auto c : cache_status)
Expand Down Expand Up @@ -365,7 +369,7 @@ Features: a, b
TEST_CASE ("Provider nullptr checks", "[BinaryCache]")
{
// create a binary cache to test
BinaryCache uut;
BinaryCache uut(get_real_filesystem());
std::vector<std::unique_ptr<IBinaryProvider>> providers;
providers.emplace_back(std::make_unique<KnowNothingBinaryProvider>());
uut.install_providers(std::move(providers));
Expand All @@ -391,7 +395,7 @@ Version: 1.5
InstallPlanAction& ipa_without_abi = install_plan.back();

// test that the binary cache does the right thing. See also CHECKs etc. in KnowNothingBinaryProvider
uut.push_success(ipa_without_abi); // should have no effects
uut.push_success(ipa_without_abi, {}); // should have no effects
CHECK(uut.try_restore(ipa_without_abi) == RestoreResult::unavailable);
uut.prefetch(install_plan); // should have no effects
}
Expand Down
57 changes: 57 additions & 0 deletions src/vcpkg/base/message_sinks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,61 @@ namespace vcpkg
m_second.print(c, sv);
}

void BGMessageSink::print(Color c, StringView sv)
{
std::lock_guard<std::mutex> print_lk(m_print_directly_lock);
if (m_print_directly_to_out_sink)
{
out_sink.print(c, sv);
return;
}

std::string s = sv.to_string();
auto pos = s.find_last_of('\n');
if (pos != std::string::npos)
{
{
std::lock_guard<std::mutex> lk(m_lock);
m_published.insert(m_published.end(),
std::make_move_iterator(m_unpublished.begin()),
std::make_move_iterator(m_unpublished.end()));
m_published.emplace_back(c, s.substr(0, pos + 1));
}
m_unpublished.clear();
if (s.size() > pos + 1)
{
m_unpublished.emplace_back(c, s.substr(pos + 1));
}
}
else
{
m_unpublished.emplace_back(c, std::move(s));
}
}

void BGMessageSink::print_published()
{
std::lock_guard<std::mutex> lk(m_lock);
for (auto&& m : m_published)
{
out_sink.print(m.first, m.second);
}
m_published.clear();
}

void BGMessageSink::publish_directly_to_out_sink()
{
std::lock_guard<std::mutex> print_lk(m_print_directly_lock);
std::lock_guard<std::mutex> lk(m_lock);

m_print_directly_to_out_sink = true;
for (auto& messages : {&m_published, &m_unpublished})
{
for (auto&& m : *messages)
{
out_sink.print(m.first, m.second);
}
messages->clear();
}
}
}
Loading