Skip to content

Commit 7ab39fb

Browse files
Jaesoo Leefacebook-github-bot
authored andcommitted
added inject_pause framework for unit testing
Summary: This change adds the inject_pause points which provide a mechanism to do a rendezvous style synchronization. This is required for unit testing to control the executions of the code from the unit tests or install hooks to insert some custom callbacks in the code path. Reviewed By: therealgymmy Differential Revision: D48706614 fbshipit-source-id: 44b4bfa5ce88b53c096c907310e494a066e7dc1a
1 parent 213af2e commit 7ab39fb

File tree

2 files changed

+384
-0
lines changed

2 files changed

+384
-0
lines changed

cachelib/common/inject_pause.cpp

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "inject_pause.h"
18+
19+
#include <folly/Indestructible.h>
20+
#include <folly/Synchronized.h>
21+
#include <folly/fibers/Baton.h>
22+
#include <folly/logging/xlog.h>
23+
24+
#include <condition_variable>
25+
#include <deque>
26+
#include <mutex>
27+
#include <string>
28+
29+
namespace facebook {
30+
namespace cachelib {
31+
32+
namespace {
33+
struct BatonSet {
34+
std::deque<folly::fibers::Baton*> batons;
35+
std::condition_variable cv;
36+
bool enabled{false};
37+
size_t limit{0};
38+
std::shared_ptr<PauseCallback> callback;
39+
};
40+
41+
using PausePointsMap =
42+
folly::Synchronized<std::unordered_map<std::string, BatonSet>, std::mutex>;
43+
PausePointsMap& pausePoints() {
44+
static folly::Indestructible<PausePointsMap> pausePoints;
45+
return *pausePoints;
46+
}
47+
48+
size_t wakeupBatonSet(BatonSet& batonSet, size_t numThreads) {
49+
size_t numWaked = 0;
50+
if (numThreads == 0) {
51+
numThreads = batonSet.batons.size();
52+
}
53+
54+
while (!batonSet.batons.empty() && numWaked < numThreads) {
55+
auto* b = batonSet.batons.front();
56+
batonSet.batons.pop_front();
57+
b->post();
58+
numWaked++;
59+
}
60+
61+
return numWaked;
62+
}
63+
64+
} // namespace
65+
66+
bool& injectPauseEnabled() {
67+
static bool enabled = false;
68+
return enabled;
69+
}
70+
71+
// Flag controls the debug logging
72+
bool& injectPauseLogEnabled() {
73+
static bool enabled = false;
74+
return enabled;
75+
}
76+
77+
namespace detail {
78+
79+
void injectPause(folly::StringPiece name) {
80+
if (!injectPauseEnabled()) {
81+
return;
82+
}
83+
folly::fibers::Baton baton;
84+
std::shared_ptr<PauseCallback> callback;
85+
{
86+
auto ptr = pausePoints().lock();
87+
auto it = ptr->find(name.str());
88+
if (it == ptr->end() || !it->second.enabled ||
89+
(it->second.limit > 0 &&
90+
it->second.batons.size() == it->second.limit)) {
91+
if (injectPauseLogEnabled()) {
92+
XLOGF(ERR, "[{}] injectPause not set", name);
93+
}
94+
return;
95+
}
96+
if (injectPauseLogEnabled()) {
97+
XLOGF(ERR, "[{}] injectPause begin", name);
98+
}
99+
callback = it->second.callback;
100+
if (!callback) {
101+
it->second.batons.push_back(&baton);
102+
it->second.cv.notify_one();
103+
}
104+
}
105+
106+
if (callback) {
107+
(*callback)();
108+
} else {
109+
baton.wait();
110+
}
111+
112+
/* Avoid potential protect-its-own-lifetime bug:
113+
Wait for the post() to finish before destroying the Baton. */
114+
auto ptr = pausePoints().lock();
115+
if (injectPauseLogEnabled()) {
116+
XLOGF(ERR, "[{}] injectPause end", name);
117+
}
118+
}
119+
120+
} // namespace detail
121+
122+
void injectPauseSet(folly::StringPiece name, size_t numThreads) {
123+
if (!injectPauseEnabled()) {
124+
return;
125+
}
126+
auto ptr = pausePoints().lock();
127+
if (injectPauseLogEnabled()) {
128+
XLOGF(ERR, "[{}] injectPauseSet threads {}", name, numThreads);
129+
}
130+
auto res = ptr->emplace(
131+
std::piecewise_construct, std::make_tuple(name.str()), std::make_tuple());
132+
res.first->second.limit = numThreads;
133+
res.first->second.enabled = true;
134+
}
135+
136+
void injectPauseSet(folly::StringPiece name, PauseCallback&& callback) {
137+
if (!injectPauseEnabled()) {
138+
return;
139+
}
140+
auto ptr = pausePoints().lock();
141+
if (injectPauseLogEnabled()) {
142+
XLOGF(ERR, "[{}] injectPauseSet callback", name);
143+
}
144+
auto res = ptr->emplace(
145+
std::piecewise_construct, std::make_tuple(name.str()), std::make_tuple());
146+
res.first->second.limit = 0;
147+
res.first->second.enabled = true;
148+
res.first->second.callback =
149+
std::make_shared<PauseCallback>(std::move(callback));
150+
}
151+
152+
bool injectPauseWait(folly::StringPiece name,
153+
size_t numThreads,
154+
bool wakeup,
155+
uint32_t timeoutMs) {
156+
if (!injectPauseEnabled() || !numThreads) {
157+
return false;
158+
}
159+
auto ptr = pausePoints().lock();
160+
if (injectPauseLogEnabled()) {
161+
XLOGF(ERR, "[{}] injectPauseWait begin {}", name, numThreads);
162+
}
163+
auto it = ptr->find(name.str());
164+
if (it == ptr->end() || !it->second.enabled) {
165+
if (injectPauseLogEnabled()) {
166+
XLOGF(ERR, "[{}] injectPauseWait: ERROR not set", name);
167+
}
168+
return false;
169+
}
170+
171+
if (!timeoutMs || timeoutMs > kInjectPauseMaxWaitTimeoutMs) {
172+
timeoutMs = kInjectPauseMaxWaitTimeoutMs;
173+
}
174+
175+
auto& batonSet = it->second;
176+
bool status = batonSet.cv.wait_for(
177+
ptr.as_lock(),
178+
std::chrono::milliseconds(timeoutMs),
179+
[&batonSet, numThreads]() {
180+
return !batonSet.enabled || batonSet.batons.size() >= numThreads;
181+
});
182+
183+
if (status && wakeup) {
184+
wakeupBatonSet(batonSet, numThreads);
185+
}
186+
187+
if (injectPauseLogEnabled()) {
188+
std::string errStr;
189+
if (!status) {
190+
errStr = fmt::format(" with ERR (paused {})", batonSet.batons.size());
191+
}
192+
XLOGF(ERR, "[{}] injectPauseWait end {}", name, errStr);
193+
}
194+
return status;
195+
}
196+
197+
size_t injectPauseClear(folly::StringPiece name) {
198+
if (!injectPauseEnabled()) {
199+
return false;
200+
}
201+
202+
size_t numPaused = 0;
203+
auto ptr = pausePoints().lock();
204+
if (injectPauseLogEnabled()) {
205+
XLOGF(ERR, "[{}] injectPauseClear ", name);
206+
}
207+
208+
if (name.empty()) {
209+
for (auto& it : *ptr) {
210+
auto& batonSet = it.second;
211+
numPaused += wakeupBatonSet(batonSet, 0);
212+
batonSet.enabled = false;
213+
batonSet.cv.notify_all();
214+
}
215+
} else {
216+
auto it = ptr->find(name.str());
217+
if (it != ptr->end()) {
218+
auto& batonSet = it->second;
219+
numPaused += wakeupBatonSet(batonSet, 0) > 0 ? 1 : 0;
220+
batonSet.enabled = false;
221+
batonSet.cv.notify_all();
222+
}
223+
}
224+
return numPaused;
225+
}
226+
227+
} // namespace cachelib
228+
} // namespace facebook

cachelib/common/inject_pause.h

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include <folly/Function.h>
20+
#include <folly/Range.h>
21+
22+
/*
23+
* inject_pause framework provides a rendezvous-style syncronization
24+
* for debugging and testing purposes. This allows us to control
25+
* executions of multiple threads deterministically.
26+
*
27+
* The basic idea is to have a named point in code where we want to
28+
* block execution of some threads. We then allow other threads,
29+
* probably, the test threads, to synchronize with the main threads
30+
* at the pause points and signal the threads to proceed.
31+
*
32+
* The basic usage is: (e.g., RegionManager.ReadWrite unit test)
33+
* 1. Insert a call to INJECT_PAUSE() in the code
34+
* void RegionManager::doReclaim() {
35+
* ...
36+
* INJECT_PAUSE("pause_reclaim_done");
37+
* }
38+
*
39+
* 2. Enable INJECT_PAUSE() using injectPauseEnabled() in unit test
40+
* TEST(RegionManager, ReadWrite) {
41+
* ...
42+
* injectPauseEnabled() = true;
43+
*
44+
* 3. In the test, we can wait for the threads to reach the pause point
45+
*
46+
* ...
47+
* rm->startReclaim();
48+
* EXPECT_TRUE(injectPauseWait("pause_reclaim_done"));
49+
* ASSERT_EQ(OpenStatus::Ready, rm->getCleanRegion(rid, false).first);
50+
* ASSERT_EQ(0, rid.index());
51+
*
52+
* 4. Once done, clear the pause point
53+
*
54+
* ASSERT_EQ(0, injectPauseClear("pause_reclaim_done"));
55+
* ...
56+
* }
57+
*
58+
* In above example, the region reclaim thread and test thread are synchronized
59+
* at the pause point and injectPauseWait() and continue.
60+
*
61+
* The inject_pause framework also supports setting a callback function where
62+
* the threads can register a callback function to be called when they hit the
63+
* INJECT_PAUSE() point.
64+
*
65+
* The example usage with the callback function is as follows:
66+
*
67+
* folly::fibers::TimedMutex mutex;
68+
* bool reclaimStarted = false;
69+
* size_t numCleanRegions = 0;
70+
* util::ConditionVariable cv;
71+
*
72+
* injectPauseSet("pause_blockcache_clean_alloc", [&]() {
73+
* std::unique_lock<folly::fibers::TimedMutex> lk(mutex);
74+
* XDCHECK_GT(numCleanRegions, 0u);
75+
* numCleanRegions--;
76+
* });
77+
*
78+
* injectPauseSet("pause_blockcache_clean_free", [&]() {
79+
* std::unique_lock<folly::fibers::TimedMutex> lk(mutex);
80+
* if (numCleanRegions++ == 0u) {
81+
* cv.notifyAll();
82+
* }
83+
* reclaimStarted = true;
84+
* });
85+
*
86+
* injectPauseSet("pause_block_cache_insert_entry", [&]() {
87+
* std::unique_lock<folly::fibers::TimedMutex> lk(mutex);
88+
* if (numCleanRegions == 0u && reclaimStarted) {
89+
* cv.wait(lk);
90+
* }
91+
* });
92+
*
93+
* In the above example, the callbacks count the number of clean regions
94+
* and allows pause_blockcache_clean_alloc to continue only if there is
95+
* at least one clean region.
96+
*/
97+
namespace facebook {
98+
namespace cachelib {
99+
namespace detail {
100+
void injectPause(folly::StringPiece name);
101+
}
102+
103+
/**
104+
* Allow stopping the process at this named point.
105+
*/
106+
#define INJECT_PAUSE(name) facebook::cachelib::detail::injectPause(#name)
107+
108+
// Callback that can be executed optionally for INJECT_PAUSE point
109+
using PauseCallback = folly::Function<void()>;
110+
111+
// Default timeout for INJECT_PAUSE wait
112+
constexpr uint32_t kInjectPauseMaxWaitTimeoutMs = 60000;
113+
114+
/**
115+
* Toggle INJECT_PAUSE() logic on and off
116+
*/
117+
bool& injectPauseEnabled();
118+
119+
/**
120+
* Stop any thread at this named INJECT_PAUSE point from now on.
121+
* @param numThreads If nonzero, only stop the first N threads.
122+
*/
123+
void injectPauseSet(folly::StringPiece name, size_t numThreads = 0);
124+
125+
/**
126+
* The thread will execute the callback at INJECT_PAUSE point and go.
127+
* @param callback Callback to be executed at INJECT_PAUSE point
128+
*/
129+
void injectPauseSet(folly::StringPiece name, PauseCallback&& callback);
130+
131+
/**
132+
* If the named INJECT_PAUSE point was set, blocks until the requested
133+
* number of threads is stopped at it including those already stopped
134+
* and returns true.
135+
* @param name Name of the INJECT_PAUSE point
136+
* @param numThreads The number of threads to wait for
137+
* @param wakeup If true, wakes up any threads blocked at the point
138+
* @param timeoutMs If nonzero, waits no longer than this many milliseconds
139+
* @return true if waited for requested number of threads successfully
140+
* false otherwise or timed out
141+
*/
142+
bool injectPauseWait(folly::StringPiece name,
143+
size_t numThreads = 1,
144+
bool wakeup = true,
145+
uint32_t timeoutMs = 0);
146+
147+
/**
148+
* Stop blocking threads at this INJECT_PAUSE point and unblock any
149+
* currently waiting threads. If name is not given, all of
150+
* INJECT_PAUSE points are cleared.
151+
* Returns the number of INJECT_PAUSE points where threads were stopped
152+
*/
153+
size_t injectPauseClear(folly::StringPiece name = "");
154+
155+
} // namespace cachelib
156+
} // namespace facebook

0 commit comments

Comments
 (0)