Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 40 additions & 28 deletions Include/internal/pycore_backoff.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,33 +22,48 @@ extern "C" {
Another use is for the Tier 2 optimizer to decide when to create
a new Tier 2 trace (executor). Again, exponential backoff is used.

The 16-bit counter is structured as a 12-bit unsigned 'value'
and a 4-bit 'backoff' field. When resetting the counter, the
The 16-bit counter is structured as a 13-bit unsigned 'value'
and a 3-bit 'backoff' field. When resetting the counter, the
backoff field is incremented (until it reaches a limit) and the
value is set to a bit mask representing the value 2**backoff - 1.
The maximum backoff is 12 (the number of bits in the value).
value is set to a bit mask representing some prime value - 1.
New values and backoffs for each backoff are calculated once
at compile time and saved to value_and_backoff_next table.
The maximum backoff is 6, since 7 is an UNREACHABLE_BACKOFF.

There is an exceptional value which must not be updated, 0xFFFF.
*/

#define BACKOFF_BITS 4
#define MAX_BACKOFF 12
#define UNREACHABLE_BACKOFF 15

static inline bool
is_unreachable_backoff_counter(_Py_BackoffCounter counter)
{
return counter.value_and_backoff == UNREACHABLE_BACKOFF;
}
#define BACKOFF_BITS 3
#define BACKOFF_MASK 7
#define MAX_BACKOFF 6
#define UNREACHABLE_BACKOFF 7
#define MAX_VALUE 0x1FFF

#define MAKE_VALUE_AND_BACKOFF(value, backoff) \
((value << BACKOFF_BITS) | backoff)

// For previous backoff b we use value x such that
// x + 1 is near to 2**(2*b+1) and x + 1 is prime.
static const uint16_t value_and_backoff_next[] = {
MAKE_VALUE_AND_BACKOFF(1, 1),
MAKE_VALUE_AND_BACKOFF(6, 2),
MAKE_VALUE_AND_BACKOFF(30, 3),
MAKE_VALUE_AND_BACKOFF(126, 4),
MAKE_VALUE_AND_BACKOFF(508, 5),
MAKE_VALUE_AND_BACKOFF(2052, 6),
// We use the same backoff counter for all backoffs >= MAX_BACKOFF.
MAKE_VALUE_AND_BACKOFF(8190, 6),
MAKE_VALUE_AND_BACKOFF(8190, 6),
};

static inline _Py_BackoffCounter
make_backoff_counter(uint16_t value, uint16_t backoff)
{
assert(backoff <= 15);
assert(value <= 0xFFF);
_Py_BackoffCounter result;
result.value_and_backoff = (value << BACKOFF_BITS) | backoff;
return result;
assert(backoff <= UNREACHABLE_BACKOFF);
assert(value <= MAX_VALUE);
return ((_Py_BackoffCounter){
.value_and_backoff = MAKE_VALUE_AND_BACKOFF(value, backoff)
});
}

static inline _Py_BackoffCounter
Expand All @@ -62,14 +77,11 @@ forge_backoff_counter(uint16_t counter)
static inline _Py_BackoffCounter
restart_backoff_counter(_Py_BackoffCounter counter)
{
assert(!is_unreachable_backoff_counter(counter));
int backoff = counter.value_and_backoff & 15;
if (backoff < MAX_BACKOFF) {
return make_backoff_counter((1 << (backoff + 1)) - 1, backoff + 1);
}
else {
return make_backoff_counter((1 << MAX_BACKOFF) - 1, MAX_BACKOFF);
}
uint16_t backoff = counter.value_and_backoff & BACKOFF_MASK;
assert(backoff <= MAX_BACKOFF);
return ((_Py_BackoffCounter){
.value_and_backoff = value_and_backoff_next[backoff]
});
}

static inline _Py_BackoffCounter
Expand Down Expand Up @@ -113,7 +125,7 @@ trigger_backoff_counter(void)
// as we always end up tracing the loop iteration's
// exhaustion iteration. Which aborts our current tracer.
#define JUMP_BACKWARD_INITIAL_VALUE 4000
#define JUMP_BACKWARD_INITIAL_BACKOFF 12
#define JUMP_BACKWARD_INITIAL_BACKOFF 6
static inline _Py_BackoffCounter
initial_jump_backoff_counter(void)
{
Expand All @@ -126,7 +138,7 @@ initial_jump_backoff_counter(void)
* otherwise when a side exit warms up we may construct
* a new trace before the Tier 1 code has properly re-specialized. */
#define SIDE_EXIT_INITIAL_VALUE 4000
#define SIDE_EXIT_INITIAL_BACKOFF 12
#define SIDE_EXIT_INITIAL_BACKOFF 6

static inline _Py_BackoffCounter
initial_temperature_backoff_counter(void)
Expand Down
84 changes: 57 additions & 27 deletions Include/internal/pycore_critical_section.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ extern "C" {
const bool _should_lock_cs = PyList_CheckExact(_orig_seq); \
PyCriticalSection _cs; \
if (_should_lock_cs) { \
_PyCriticalSection_Begin(&_cs, _orig_seq); \
PyCriticalSection_Begin(&_cs, _orig_seq); \
}

# define Py_END_CRITICAL_SECTION_SEQUENCE_FAST() \
Expand Down Expand Up @@ -77,10 +77,10 @@ _PyCriticalSection_Resume(PyThreadState *tstate);

// (private) slow path for locking the mutex
PyAPI_FUNC(void)
_PyCriticalSection_BeginSlow(PyCriticalSection *c, PyMutex *m);
_PyCriticalSection_BeginSlow(PyThreadState *tstate, PyCriticalSection *c, PyMutex *m);

PyAPI_FUNC(void)
_PyCriticalSection2_BeginSlow(PyCriticalSection2 *c, PyMutex *m1, PyMutex *m2,
_PyCriticalSection2_BeginSlow(PyThreadState *tstate, PyCriticalSection2 *c, PyMutex *m1, PyMutex *m2,
int is_m1_locked);

PyAPI_FUNC(void)
Expand All @@ -95,34 +95,30 @@ _PyCriticalSection_IsActive(uintptr_t tag)
}

static inline void
_PyCriticalSection_BeginMutex(PyCriticalSection *c, PyMutex *m)
_PyCriticalSection_BeginMutex(PyThreadState *tstate, PyCriticalSection *c, PyMutex *m)
{
if (PyMutex_LockFast(m)) {
PyThreadState *tstate = _PyThreadState_GET();
c->_cs_mutex = m;
c->_cs_prev = tstate->critical_section;
tstate->critical_section = (uintptr_t)c;
}
else {
_PyCriticalSection_BeginSlow(c, m);
_PyCriticalSection_BeginSlow(tstate, c, m);
}
}
#define PyCriticalSection_BeginMutex _PyCriticalSection_BeginMutex

static inline void
_PyCriticalSection_Begin(PyCriticalSection *c, PyObject *op)
_PyCriticalSection_Begin(PyThreadState *tstate, PyCriticalSection *c, PyObject *op)
{
_PyCriticalSection_BeginMutex(c, &op->ob_mutex);
_PyCriticalSection_BeginMutex(tstate, c, &op->ob_mutex);
}
#define PyCriticalSection_Begin _PyCriticalSection_Begin

// Removes the top-most critical section from the thread's stack of critical
// sections. If the new top-most critical section is inactive, then it is
// resumed.
static inline void
_PyCriticalSection_Pop(PyCriticalSection *c)
_PyCriticalSection_Pop(PyThreadState *tstate, PyCriticalSection *c)
{
PyThreadState *tstate = _PyThreadState_GET();
uintptr_t prev = c->_cs_prev;
tstate->critical_section = prev;

Expand All @@ -132,7 +128,7 @@ _PyCriticalSection_Pop(PyCriticalSection *c)
}

static inline void
_PyCriticalSection_End(PyCriticalSection *c)
_PyCriticalSection_End(PyThreadState *tstate, PyCriticalSection *c)
{
// If the mutex is NULL, we used the fast path in
// _PyCriticalSection_BeginSlow for locks already held in the top-most
Expand All @@ -141,18 +137,17 @@ _PyCriticalSection_End(PyCriticalSection *c)
return;
}
PyMutex_Unlock(c->_cs_mutex);
_PyCriticalSection_Pop(c);
_PyCriticalSection_Pop(tstate, c);
}
#define PyCriticalSection_End _PyCriticalSection_End

static inline void
_PyCriticalSection2_BeginMutex(PyCriticalSection2 *c, PyMutex *m1, PyMutex *m2)
_PyCriticalSection2_BeginMutex(PyThreadState *tstate, PyCriticalSection2 *c, PyMutex *m1, PyMutex *m2)
{
if (m1 == m2) {
// If the two mutex arguments are the same, treat this as a critical
// section with a single mutex.
c->_cs_mutex2 = NULL;
_PyCriticalSection_BeginMutex(&c->_cs_base, m1);
_PyCriticalSection_BeginMutex(tstate, &c->_cs_base, m1);
return;
}

Expand All @@ -167,7 +162,6 @@ _PyCriticalSection2_BeginMutex(PyCriticalSection2 *c, PyMutex *m1, PyMutex *m2)

if (PyMutex_LockFast(m1)) {
if (PyMutex_LockFast(m2)) {
PyThreadState *tstate = _PyThreadState_GET();
c->_cs_base._cs_mutex = m1;
c->_cs_mutex2 = m2;
c->_cs_base._cs_prev = tstate->critical_section;
Expand All @@ -176,24 +170,22 @@ _PyCriticalSection2_BeginMutex(PyCriticalSection2 *c, PyMutex *m1, PyMutex *m2)
tstate->critical_section = p;
}
else {
_PyCriticalSection2_BeginSlow(c, m1, m2, 1);
_PyCriticalSection2_BeginSlow(tstate, c, m1, m2, 1);
}
}
else {
_PyCriticalSection2_BeginSlow(c, m1, m2, 0);
_PyCriticalSection2_BeginSlow(tstate, c, m1, m2, 0);
}
}
#define PyCriticalSection2_BeginMutex _PyCriticalSection2_BeginMutex

static inline void
_PyCriticalSection2_Begin(PyCriticalSection2 *c, PyObject *a, PyObject *b)
_PyCriticalSection2_Begin(PyThreadState *tstate, PyCriticalSection2 *c, PyObject *a, PyObject *b)
{
_PyCriticalSection2_BeginMutex(c, &a->ob_mutex, &b->ob_mutex);
_PyCriticalSection2_BeginMutex(tstate, c, &a->ob_mutex, &b->ob_mutex);
}
#define PyCriticalSection2_Begin _PyCriticalSection2_Begin

static inline void
_PyCriticalSection2_End(PyCriticalSection2 *c)
_PyCriticalSection2_End(PyThreadState *tstate, PyCriticalSection2 *c)
{
// if mutex1 is NULL, we used the fast path in
// _PyCriticalSection_BeginSlow for mutexes that are already held,
Expand All @@ -207,9 +199,8 @@ _PyCriticalSection2_End(PyCriticalSection2 *c)
PyMutex_Unlock(c->_cs_mutex2);
}
PyMutex_Unlock(c->_cs_base._cs_mutex);
_PyCriticalSection_Pop(&c->_cs_base);
_PyCriticalSection_Pop(tstate, &c->_cs_base);
}
#define PyCriticalSection2_End _PyCriticalSection2_End

static inline void
_PyCriticalSection_AssertHeld(PyMutex *mutex)
Expand Down Expand Up @@ -251,6 +242,45 @@ _PyCriticalSection_AssertHeldObj(PyObject *op)

#endif
}

#undef Py_BEGIN_CRITICAL_SECTION
# define Py_BEGIN_CRITICAL_SECTION(op) \
{ \
PyCriticalSection _py_cs; \
PyThreadState *_cs_tstate = _PyThreadState_GET(); \
_PyCriticalSection_Begin(_cs_tstate, &_py_cs, _PyObject_CAST(op))

#undef Py_BEGIN_CRITICAL_SECTION_MUTEX
# define Py_BEGIN_CRITICAL_SECTION_MUTEX(mutex) \
{ \
PyCriticalSection _py_cs; \
PyThreadState *_cs_tstate = _PyThreadState_GET(); \
_PyCriticalSection_BeginMutex(_cs_tstate, &_py_cs, mutex)

#undef Py_END_CRITICAL_SECTION
# define Py_END_CRITICAL_SECTION() \
_PyCriticalSection_End(_cs_tstate, &_py_cs); \
}

#undef Py_BEGIN_CRITICAL_SECTION2
# define Py_BEGIN_CRITICAL_SECTION2(a, b) \
{ \
PyCriticalSection2 _py_cs2; \
PyThreadState *_cs_tstate = _PyThreadState_GET(); \
_PyCriticalSection2_Begin(_cs_tstate, &_py_cs2, _PyObject_CAST(a), _PyObject_CAST(b))

#undef Py_BEGIN_CRITICAL_SECTION2_MUTEX
# define Py_BEGIN_CRITICAL_SECTION2_MUTEX(m1, m2) \
{ \
PyCriticalSection2 _py_cs2; \
PyThreadState *_cs_tstate = _PyThreadState_GET(); \
_PyCriticalSection2_BeginMutex(_cs_tstate, &_py_cs2, m1, m2)

#undef Py_END_CRITICAL_SECTION2
# define Py_END_CRITICAL_SECTION2() \
_PyCriticalSection2_End(_cs_tstate, &_py_cs2); \
}

#endif /* Py_GIL_DISABLED */

#ifdef __cplusplus
Expand Down
Binary file removed Lib/data.bin
Binary file not shown.
50 changes: 50 additions & 0 deletions Lib/test/test_free_threading/test_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import csv
import io
import unittest

from test.support import threading_helper
from test.support.threading_helper import run_concurrently


NTHREADS = 10


@threading_helper.requires_working_threading()
class TestCSV(unittest.TestCase):
def test_concurrent_reader_next(self):
input_rows = [f"{i},{i},{i}" for i in range(50)]
input_stream = io.StringIO("\n".join(input_rows))
reader = csv.reader(input_stream)
output_rows = []

def read_row():
for row in reader:
self.assertEqual(len(row), 3)
output_rows.append(",".join(row))

run_concurrently(worker_func=read_row, nthreads=NTHREADS)
self.assertSetEqual(set(input_rows), set(output_rows))

def test_concurrent_writer_writerow(self):
output_stream = io.StringIO()
writer = csv.writer(output_stream)
row_per_thread = 10
expected_rows = []

def write_row():
for i in range(row_per_thread):
writer.writerow([i, i, i])
expected_rows.append(f"{i},{i},{i}")

run_concurrently(worker_func=write_row, nthreads=NTHREADS)

# Rewind to the start of the stream and parse the rows
output_stream.seek(0)
output_rows = [line.strip() for line in output_stream.readlines()]

self.assertEqual(len(output_rows), NTHREADS * row_per_thread)
self.assertListEqual(sorted(output_rows), sorted(expected_rows))


if __name__ == "__main__":
unittest.main()
2 changes: 1 addition & 1 deletion Lib/test/test_opcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ def make_deferred_ref_count_obj():
class TestRacesDoNotCrash(TestBase):
# Careful with these. Bigger numbers have a higher chance of catching bugs,
# but you can also burn through a *ton* of type/dict/function versions:
ITEMS = 1000
ITEMS = 1400
LOOPS = 4
WRITERS = 2

Expand Down
10 changes: 10 additions & 0 deletions Lib/test/test_perf_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,16 @@ def baz():
self.assertIn(f"py::bar_fork:{script}", child_perf_file_contents)
self.assertIn(f"py::baz_fork:{script}", child_perf_file_contents)

# The parent's map should not contain the child's symbols.
self.assertNotIn(f"py::foo_fork:{script}", perf_file_contents)
self.assertNotIn(f"py::bar_fork:{script}", perf_file_contents)
self.assertNotIn(f"py::baz_fork:{script}", perf_file_contents)

# The child's map should not contain the parent's symbols.
self.assertNotIn(f"py::foo:{script}", child_perf_file_contents)
self.assertNotIn(f"py::bar:{script}", child_perf_file_contents)
self.assertNotIn(f"py::baz:{script}", child_perf_file_contents)

@unittest.skipIf(support.check_bolt_optimized(), "fails on BOLT instrumented binaries")
def test_sys_api(self):
for define_eval_hook in (False, True):
Expand Down
1 change: 1 addition & 0 deletions Makefile.pre.in
Original file line number Diff line number Diff line change
Expand Up @@ -2573,6 +2573,7 @@ LIBSUBDIRS= asyncio \
profile \
profiling profiling/sampling profiling/tracing \
profiling/sampling/_assets \
profiling/sampling/live_collector \
profiling/sampling/_vendor/d3/7.8.5 \
profiling/sampling/_vendor/d3-flame-graph/4.1.3 \
pydoc_data \
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Make csv module thread-safe on the :term:`free threaded <free threading>`
build.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Change ``backoff counter`` to use prime numbers instead of powers of 2.
Use only 3 bits for ``counter`` and 13 bits for ``value``.
This allows to support values up to 8191. Patch by Mikhail Efimov.
Loading
Loading