From 9e3385bf87ade9f2d0e6dcea255bfadc49f8788e Mon Sep 17 00:00:00 2001 From: ttt161 Date: Wed, 20 Aug 2025 19:26:14 +0300 Subject: [PATCH 1/3] fix zombie collection --- .env | 2 +- docker-compose.yml | 10 ++-- src/prg_utils.erl | 5 ++ src/storage/postgres/prg_pg_backend.erl | 12 +++-- test/prg_base_SUITE.erl | 67 +++++++++++++++++++++++++ 5 files changed, 85 insertions(+), 11 deletions(-) diff --git a/.env b/.env index 80250d0..6233462 100644 --- a/.env +++ b/.env @@ -2,4 +2,4 @@ SERVICE_NAME=progressor OTP_VERSION=27.1.2 REBAR_VERSION=3.24 THRIFT_VERSION=0.14.2.3 -CONFLUENT_PLATFORM_VERSION=5.1.2 +CONFLUENT_PLATFORM_VERSION=7.2.15 diff --git a/docker-compose.yml b/docker-compose.yml index 111d5fe..39954e5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -30,9 +30,6 @@ services: POSTGRES_DB: "progressor_db" POSTGRES_USER: "progressor" POSTGRES_PASSWORD: "progressor" - PGDATA: "/tmp/postgresql/data/pgdata" - volumes: - - progressor-data:/tmp/postgresql/data ports: - "5432:5432" healthcheck: @@ -64,7 +61,8 @@ services: depends_on: - zookeeper healthcheck: - test: ["CMD", "kafka-topics", "--list", "--zookeeper", "zookeeper:2181"] + # TODO: FIX ME + test: exit 0 interval: 5s timeout: 10s retries: 5 @@ -87,5 +85,5 @@ services: KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9092 -volumes: - progressor-data: +#volumes: +# progressor-data: diff --git a/src/prg_utils.erl b/src/prg_utils.erl index 3f7304a..f42489e 100644 --- a/src/prg_utils.erl +++ b/src/prg_utils.erl @@ -7,6 +7,7 @@ -export([pipe/2]). -export([format/1]). -export([make_ns_opts/2]). +-export([unixtime_to_datetime/1]). -export([with_observe/3]). -export([with_observe/4]). @@ -40,6 +41,10 @@ make_ns_opts(NsId, NsOpts) -> Defaults = maps:merge(PresetDefaults, ConfigDefaults), maps:merge(Defaults, NsOpts). +-spec unixtime_to_datetime(timestamp_sec()) -> calendar:datetime(). +unixtime_to_datetime(TimestampSec) -> + calendar:gregorian_seconds_to_datetime(TimestampSec + ?EPOCH_DIFF). + -spec with_observe(_Fun, atom(), [list() | binary()]) -> any(). with_observe(Fun, MetricKey, Labels) -> with_observe(Fun, histogram, MetricKey, Labels). diff --git a/src/storage/postgres/prg_pg_backend.erl b/src/storage/postgres/prg_pg_backend.erl index c580ba5..16004de 100644 --- a/src/storage/postgres/prg_pg_backend.erl +++ b/src/storage/postgres/prg_pg_backend.erl @@ -218,15 +218,16 @@ collect_zombies(PgOpts, NsId, Timeout) -> #{ processes := ProcessesTable, tasks := TaskTable, + schedule := ScheduleTable, running := RunningTable } = prg_pg_utils:tables(NsId), NowSec = erlang:system_time(second), Now = unixtime_to_datetime(NowSec), TsBackward = unixtime_to_datetime(NowSec - (Timeout + ?PROTECT_TIMEOUT)), - epg_pool:transaction( + {ok, _, _} = epg_pool:transaction( Pool, fun(Connection) -> - {ok, _, _} = epg_pool:query( + epg_pool:query( Connection, "WITH zombie_tasks as (" " DELETE FROM " ++ RunningTable ++ @@ -234,9 +235,12 @@ collect_zombies(PgOpts, NsId, Timeout) -> " RETURNING process_id, task_id" " ), " " t1 AS (UPDATE " ++ TaskTable ++ - " SET status = 'cancelled' WHERE process_id IN (SELECT process_id FROM zombie_tasks))" + " SET status = 'cancelled' WHERE status IN ('waiting', 'blocked') " + " AND process_id IN (SELECT process_id FROM zombie_tasks))," " t2 AS (UPDATE " ++ TaskTable ++ - " SET status = 'error', finished_time = $2 WHERE task_id IN (SELECT task_id FROM zombie_tasks))" + " SET status = 'error', finished_time = $2 WHERE task_id IN (SELECT task_id FROM zombie_tasks))," + " t3 AS (DELETE FROM " ++ ScheduleTable ++ + " WHERE process_id IN (SELECT process_id FROM zombie_tasks))" "MERGE INTO " ++ ProcessesTable ++ " AS pt USING zombie_tasks AS zt ON pt.process_id = zt.process_id " " WHEN MATCHED THEN UPDATE SET" diff --git a/test/prg_base_SUITE.erl b/test/prg_base_SUITE.erl index 5c7f56b..b5ca447 100644 --- a/test/prg_base_SUITE.erl +++ b/test/prg_base_SUITE.erl @@ -28,6 +28,7 @@ -export([remove_by_timer_test/1]). -export([remove_without_timer_test/1]). -export([put_process_test/1]). +-export([put_process_zombie_test/1]). -export([put_process_with_timeout_test/1]). -export([put_process_with_remove_test/1]). @@ -72,6 +73,7 @@ groups() -> remove_by_timer_test, remove_without_timer_test, put_process_test, + put_process_zombie_test, put_process_with_timeout_test, put_process_with_remove_test ]}, @@ -682,6 +684,71 @@ put_process_with_timeout_test(C) -> unmock_processor(), ok. %% +-spec put_process_zombie_test(_) -> _. +put_process_zombie_test(C) -> + %% steps: + %% 1. put -> [event1], timer 1s + %% 2. insert running task from past + %% 3. zombie collecttion + Id = gen_id(), + Args = #{ + process => #{ + process_id => Id, + status => <<"running">>, + history => [event(1)] + } + }, + {ok, ok} = progressor:put(#{ns => ?NS(C), id => Id, args => Args}), + Now = erlang:system_time(second), + ZombieTs = prg_utils:unixtime_to_datetime(Now - 30), + NS = erlang:atom_to_list(?NS(C)), + %% TODO: rework it via storage backend + %% START SQL INJECTION + {ok, _, _, [{TaskId}]} = epg_pool:query( + default_pool, + "INSERT INTO \"" ++ NS ++ "_tasks\" " + " (process_id, task_type, status, scheduled_time, running_time, args, last_retry_interval, attempts_count)" + " VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING task_id", + [ + Id, + <<"timeout">>, + <<"running">>, + ZombieTs, + ZombieTs, + <<>>, + 0, + 0 + ] + ), + {ok, 1} = epg_pool:query( + default_pool, + "INSERT INTO \"" ++ NS ++ "_running\" " + " (task_id, process_id, task_type, status, scheduled_time, running_time, args, " + " last_retry_interval, attempts_count)" + " VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)", + [ + TaskId, + Id, + <<"timeout">>, + <<"running">>, + ZombieTs, + ZombieTs, + <<>>, + 0, + 0 + ] + ), + %% END SQL INJECTION + + %% await zombie collection (process step timeout (10s) + random part (2s)) + timer:sleep(12010), + {ok, #{ + process_id := Id, + status := <<"error">>, + detail := <<"zombie detected">> + }} = progressor:get(#{ns => ?NS(C), id => Id}), + ok. +%% -spec put_process_with_remove_test(_) -> _. put_process_with_remove_test(C) -> %% steps: From 6ed2109f9c11bc2aec74bc2c57c2779b4f053bbe Mon Sep 17 00:00:00 2001 From: ttt161 Date: Wed, 20 Aug 2025 19:53:29 +0300 Subject: [PATCH 2/3] six formatting --- test/prg_base_SUITE.erl | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/test/prg_base_SUITE.erl b/test/prg_base_SUITE.erl index b5ca447..29d6382 100644 --- a/test/prg_base_SUITE.erl +++ b/test/prg_base_SUITE.erl @@ -706,9 +706,10 @@ put_process_zombie_test(C) -> %% START SQL INJECTION {ok, _, _, [{TaskId}]} = epg_pool:query( default_pool, - "INSERT INTO \"" ++ NS ++ "_tasks\" " - " (process_id, task_type, status, scheduled_time, running_time, args, last_retry_interval, attempts_count)" - " VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING task_id", + "INSERT INTO \"" ++ NS ++ + "_tasks\" " + " (process_id, task_type, status, scheduled_time, running_time, args, last_retry_interval, attempts_count)" + " VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING task_id", [ Id, <<"timeout">>, @@ -722,10 +723,11 @@ put_process_zombie_test(C) -> ), {ok, 1} = epg_pool:query( default_pool, - "INSERT INTO \"" ++ NS ++ "_running\" " - " (task_id, process_id, task_type, status, scheduled_time, running_time, args, " - " last_retry_interval, attempts_count)" - " VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)", + "INSERT INTO \"" ++ NS ++ + "_running\" " + " (task_id, process_id, task_type, status, scheduled_time, running_time, args, " + " last_retry_interval, attempts_count)" + " VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)", [ TaskId, Id, From 3dba21c44c0d8d3b5e0514c08ac1951cafd24be5 Mon Sep 17 00:00:00 2001 From: Aleksey Kashapov Date: Thu, 21 Aug 2025 11:39:34 +0300 Subject: [PATCH 3/3] Fixes kafka healthcheck command --- docker-compose.yml | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 39954e5..2708c6c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -61,8 +61,7 @@ services: depends_on: - zookeeper healthcheck: - # TODO: FIX ME - test: exit 0 + test: ["CMD", "kafka-topics", "--list", "--bootstrap-server", "localhost:9092"] interval: 5s timeout: 10s retries: 5 @@ -84,6 +83,3 @@ services: KAFKA_BROKER_ID: 3 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9092 - -#volumes: -# progressor-data: