From a7608c3b46e87b40e04d235be6e4a65643e12b67 Mon Sep 17 00:00:00 2001 From: ttt161 Date: Thu, 7 Aug 2025 18:20:22 +0300 Subject: [PATCH 1/2] TECH-224: add request options --- src/progressor.erl | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/src/progressor.erl b/src/progressor.erl index 750d680..ad7fd61 100644 --- a/src/progressor.erl +++ b/src/progressor.erl @@ -29,7 +29,8 @@ args => term(), idempotency_key => binary(), context => binary(), - range => history_range() + range => history_range(), + options => map() }. %% see receive blocks bellow in this module @@ -267,10 +268,10 @@ await_task_result(StorageOpts, NsId, KeyOrId, Timeout, Duration) -> ) end. -do_get(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId, range := HistoryRange}) -> - prg_storage:get_process(external, StorageOpts, NsId, Id, HistoryRange); -do_get(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId}) -> - prg_storage:get_process(external, StorageOpts, NsId, Id, #{}). +do_get(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId, range := HistoryRange} = Req) -> + prg_storage:get_process(recipient(options(Req)), StorageOpts, NsId, Id, HistoryRange); +do_get(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId} = Req) -> + prg_storage:get_process(recipient(options(Req)), StorageOpts, NsId, Id, #{}). do_put( #{ @@ -437,3 +438,13 @@ maybe_add_key(undefined, _Key, Map) -> Map; maybe_add_key(Value, Key, Map) -> Map#{Key => Value}. + +options(#{options := Opts}) -> + Opts; +options(_) -> + #{}. + +recipient(#{cache := ignore}) -> + internal; +recipient(_Req) -> + external. From db59d265d1adc7759fae720c70c7464b24dc255c Mon Sep 17 00:00:00 2001 From: ttt161 Date: Fri, 8 Aug 2025 08:00:09 +0300 Subject: [PATCH 2/2] TECH-224: fix range limit for cache --- src/storage/postgres/prg_pg_cache.erl | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/storage/postgres/prg_pg_cache.erl b/src/storage/postgres/prg_pg_cache.erl index 1ae991b..b14845f 100644 --- a/src/storage/postgres/prg_pg_cache.erl +++ b/src/storage/postgres/prg_pg_cache.erl @@ -134,7 +134,7 @@ handle_info({timeout, _TRef, restart_replication}, State) -> else Error -> logger:error("Can`t restart replication with error: ~p", [Error]), - ReconnectTimeout = application:get_env(progressor, cache_reconnect_timeout, 5000), + ReconnectTimeout = application:get_env(progressor, cache_reconnect_timeout, ?DEFAULT_RECONNECT_TIMEOUT), erlang:start_timer(ReconnectTimeout, self(), restart_replication), {noreply, State} end; @@ -349,7 +349,7 @@ process_by_range(Events, #{offset := After} = Range) -> Sorted ), {lists:sublist(Filtered, Limit), LastEventID}; -process_by_range(Events, _Range) -> +process_by_range(Events, Range) -> [{_, #{<<"event_id">> := LastEventID}} | _] = Sorted = lists:sort( fun({_, #{<<"event_id">> := EventID1}}, {_, #{<<"event_id">> := EventID2}}) -> EventID1 < EventID2 end, @@ -359,7 +359,8 @@ process_by_range(Events, _Range) -> fun({_, Ev}) -> Ev end, Sorted ), - {History, LastEventID}. + Limit = maps:get(limit, Range, erlang:length(History)), + {lists:sublist(History, Limit), LastEventID}. convert_event(#{<<"timestamp">> := null} = Event) -> Event;