@@ -25,7 +25,63 @@ class IsolationLevel:
2525 AUTOCOMMIT = "AUTOCOMMIT"
2626
2727
28- class Connection :
28+ class BaseConnection :
29+ _tx_mode : ydb .BaseQueryTxMode = ydb .QuerySerializableReadWrite ()
30+ _tx_context : ydb .QueryTxContext | ydb .aio .QueryTxContext | None = None
31+ interactive_transaction : bool = False
32+
33+ def set_isolation_level (self , isolation_level : str ) -> None :
34+ class IsolationSettings (NamedTuple ):
35+ ydb_mode : ydb .BaseQueryTxMode
36+ interactive : bool
37+
38+ ydb_isolation_settings_map = {
39+ IsolationLevel .AUTOCOMMIT : IsolationSettings (
40+ ydb .QuerySerializableReadWrite (), interactive = False
41+ ),
42+ IsolationLevel .SERIALIZABLE : IsolationSettings (
43+ ydb .QuerySerializableReadWrite (), interactive = True
44+ ),
45+ IsolationLevel .ONLINE_READONLY : IsolationSettings (
46+ ydb .QueryOnlineReadOnly (), interactive = True
47+ ),
48+ IsolationLevel .ONLINE_READONLY_INCONSISTENT : IsolationSettings (
49+ ydb .QueryOnlineReadOnly ().with_allow_inconsistent_reads (),
50+ interactive = True ,
51+ ),
52+ IsolationLevel .STALE_READONLY : IsolationSettings (
53+ ydb .QueryStaleReadOnly (), interactive = True
54+ ),
55+ IsolationLevel .SNAPSHOT_READONLY : IsolationSettings (
56+ ydb .QuerySnapshotReadOnly (), interactive = True
57+ ),
58+ }
59+ ydb_isolation_settings = ydb_isolation_settings_map [isolation_level ]
60+ if self ._tx_context and self ._tx_context .tx_id :
61+ raise InternalError (
62+ "Failed to set transaction mode: transaction is already began"
63+ )
64+ self ._tx_mode = ydb_isolation_settings .ydb_mode
65+ self .interactive_transaction = ydb_isolation_settings .interactive
66+
67+ def get_isolation_level (self ) -> str :
68+ if self ._tx_mode .name == ydb .QuerySerializableReadWrite ().name :
69+ if self .interactive_transaction :
70+ return IsolationLevel .SERIALIZABLE
71+ return IsolationLevel .AUTOCOMMIT
72+ if self ._tx_mode .name == ydb .QueryOnlineReadOnly ().name :
73+ if self ._tx_mode .settings .allow_inconsistent_reads :
74+ return IsolationLevel .ONLINE_READONLY_INCONSISTENT
75+ return IsolationLevel .ONLINE_READONLY
76+ if self ._tx_mode .name == ydb .QueryStaleReadOnly ().name :
77+ return IsolationLevel .STALE_READONLY
78+ if self ._tx_mode .name == ydb .QuerySnapshotReadOnly ().name :
79+ return IsolationLevel .SNAPSHOT_READONLY
80+ msg = f"{ self ._tx_mode .name } is not supported"
81+ raise NotSupportedError (msg )
82+
83+
84+ class Connection (BaseConnection ):
2985 def __init__ (
3086 self ,
3187 host : str = "" ,
@@ -80,56 +136,6 @@ def wait_ready(self, timeout: int = 10) -> None:
80136
81137 self ._session = self ._session_pool .acquire ()
82138
83- def set_isolation_level (self , isolation_level : str ) -> None :
84- class IsolationSettings (NamedTuple ):
85- ydb_mode : ydb .BaseQueryTxMode
86- interactive : bool
87-
88- ydb_isolation_settings_map = {
89- IsolationLevel .AUTOCOMMIT : IsolationSettings (
90- ydb .QuerySerializableReadWrite (), interactive = False
91- ),
92- IsolationLevel .SERIALIZABLE : IsolationSettings (
93- ydb .QuerySerializableReadWrite (), interactive = True
94- ),
95- IsolationLevel .ONLINE_READONLY : IsolationSettings (
96- ydb .QueryOnlineReadOnly (), interactive = True
97- ),
98- IsolationLevel .ONLINE_READONLY_INCONSISTENT : IsolationSettings (
99- ydb .QueryOnlineReadOnly ().with_allow_inconsistent_reads (),
100- interactive = True ,
101- ),
102- IsolationLevel .STALE_READONLY : IsolationSettings (
103- ydb .QueryStaleReadOnly (), interactive = True
104- ),
105- IsolationLevel .SNAPSHOT_READONLY : IsolationSettings (
106- ydb .QuerySnapshotReadOnly (), interactive = True
107- ),
108- }
109- ydb_isolation_settings = ydb_isolation_settings_map [isolation_level ]
110- if self ._tx_context and self ._tx_context .tx_id :
111- raise InternalError (
112- "Failed to set transaction mode: transaction is already began"
113- )
114- self ._tx_mode = ydb_isolation_settings .ydb_mode
115- self .interactive_transaction = ydb_isolation_settings .interactive
116-
117- def get_isolation_level (self ) -> str :
118- if self ._tx_mode .name == ydb .QuerySerializableReadWrite ().name :
119- if self .interactive_transaction :
120- return IsolationLevel .SERIALIZABLE
121- return IsolationLevel .AUTOCOMMIT
122- if self ._tx_mode .name == ydb .QueryOnlineReadOnly ().name :
123- if self ._tx_mode .settings .allow_inconsistent_reads :
124- return IsolationLevel .ONLINE_READONLY_INCONSISTENT
125- return IsolationLevel .ONLINE_READONLY
126- if self ._tx_mode .name == ydb .QueryStaleReadOnly ().name :
127- return IsolationLevel .STALE_READONLY
128- if self ._tx_mode .name == ydb .QuerySnapshotReadOnly ().name :
129- return IsolationLevel .SNAPSHOT_READONLY
130- msg = f"{ self ._tx_mode .name } is not supported"
131- raise NotSupportedError (msg )
132-
133139 def cursor (self ) -> Cursor :
134140 if self ._session is None :
135141 raise RuntimeError ("Connection is not ready, use wait_ready." )
@@ -220,7 +226,7 @@ def callee() -> ydb.Directory:
220226 return result
221227
222228
223- class AsyncConnection :
229+ class AsyncConnection ( BaseConnection ) :
224230 def __init__ (
225231 self ,
226232 host : str = "" ,
@@ -275,56 +281,6 @@ async def wait_ready(self, timeout: int = 10) -> None:
275281
276282 self ._session = await self ._session_pool .acquire ()
277283
278- def set_isolation_level (self , isolation_level : str ) -> None :
279- class IsolationSettings (NamedTuple ):
280- ydb_mode : ydb .BaseQueryTxMode
281- interactive : bool
282-
283- ydb_isolation_settings_map = {
284- IsolationLevel .AUTOCOMMIT : IsolationSettings (
285- ydb .QuerySerializableReadWrite (), interactive = False
286- ),
287- IsolationLevel .SERIALIZABLE : IsolationSettings (
288- ydb .QuerySerializableReadWrite (), interactive = True
289- ),
290- IsolationLevel .ONLINE_READONLY : IsolationSettings (
291- ydb .QueryOnlineReadOnly (), interactive = True
292- ),
293- IsolationLevel .ONLINE_READONLY_INCONSISTENT : IsolationSettings (
294- ydb .QueryOnlineReadOnly ().with_allow_inconsistent_reads (),
295- interactive = True ,
296- ),
297- IsolationLevel .STALE_READONLY : IsolationSettings (
298- ydb .QueryStaleReadOnly (), interactive = True
299- ),
300- IsolationLevel .SNAPSHOT_READONLY : IsolationSettings (
301- ydb .QuerySnapshotReadOnly (), interactive = True
302- ),
303- }
304- ydb_isolation_settings = ydb_isolation_settings_map [isolation_level ]
305- if self ._tx_context and self ._tx_context .tx_id :
306- raise InternalError (
307- "Failed to set transaction mode: transaction is already began"
308- )
309- self ._tx_mode = ydb_isolation_settings .ydb_mode
310- self .interactive_transaction = ydb_isolation_settings .interactive
311-
312- def get_isolation_level (self ) -> str :
313- if self ._tx_mode .name == ydb .QuerySerializableReadWrite ().name :
314- if self .interactive_transaction :
315- return IsolationLevel .SERIALIZABLE
316- return IsolationLevel .AUTOCOMMIT
317- if self ._tx_mode .name == ydb .QueryOnlineReadOnly ().name :
318- if self ._tx_mode .settings .allow_inconsistent_reads :
319- return IsolationLevel .ONLINE_READONLY_INCONSISTENT
320- return IsolationLevel .ONLINE_READONLY
321- if self ._tx_mode .name == ydb .QueryStaleReadOnly ().name :
322- return IsolationLevel .STALE_READONLY
323- if self ._tx_mode .name == ydb .QuerySnapshotReadOnly ().name :
324- return IsolationLevel .SNAPSHOT_READONLY
325- msg = f"{ self ._tx_mode .name } is not supported"
326- raise NotSupportedError (msg )
327-
328284 def cursor (self ) -> AsyncCursor :
329285 if self ._session is None :
330286 raise RuntimeError ("Connection is not ready, use wait_ready." )
0 commit comments