Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion include/progressor.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
history => [event()],
corrupted_by => task_id(),
range => history_range(),
last_event_id => event_id()
last_event_id => event_id(),
running_task => task_id()
}.

-type task() :: #{
Expand Down Expand Up @@ -40,6 +41,27 @@
payload := binary()
}.

-type process_trace_unit() :: #{
task_id := task_id(),
task_type := task_type(),
task_status := task_status(),
scheduled := timestamp_sec(),
running => timestamp_sec(),
finished => timestamp_sec(),
args => binary(),
metadata => map(),
idempotency_key => binary(),
response => term(),
retry_interval => non_neg_integer(),
retry_attempts => non_neg_integer(),
event_id => event_id(),
event_timestamp => timestamp_sec(),
event_metadata => #{format => pos_integer()},
event_payload => binary()
}.

-type process_flat_trace() :: [process_trace_unit()].

%%%
%%% Config options
%%%
Expand Down
35 changes: 20 additions & 15 deletions priv/schemas/postgres-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@ CREATE TYPE process_status AS ENUM ('running', 'error');
CREATE TYPE task_status AS ENUM ('waiting', 'running', 'blocked', 'error', 'finished', 'cancelled');
CREATE TYPE task_type AS ENUM ('init', 'timeout', 'call', 'notify', 'repair', 'remove');

CREATE TABLE IF NOT EXISTS default_processes(
CREATE TABLE IF NOT EXISTS namespace_processes(
"process_id" VARCHAR(80) PRIMARY KEY,
"status" process_status NOT NULL,
"detail" TEXT,
"aux_state" BYTEA,
"created_at" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(),
"metadata" JSONB
);

CREATE TABLE IF NOT EXISTS default_tasks(
CREATE TABLE IF NOT EXISTS namespace_tasks(
"task_id" BIGSERIAL PRIMARY KEY,
"process_id" VARCHAR(80) NOT NULL,
"process_id" VARCHAR(256) NOT NULL,
"task_type" task_type NOT NULL,
"status" task_status NOT NULL,
"scheduled_time" TIMESTAMP WITH TIME ZONE NOT NULL,
Expand All @@ -22,15 +23,16 @@ CREATE TABLE IF NOT EXISTS default_tasks(
"metadata" JSONB,
"idempotency_key" VARCHAR(80) UNIQUE,
"response" BYTEA,
"blocked_task" BIGINT REFERENCES namespace_tasks("task_id"),
"last_retry_interval" INTEGER NOT NULL,
"attempts_count" SMALLINT NOT NULL,
"context" BYTEA,
FOREIGN KEY ("process_id") REFERENCES default_processes ("process_id")
FOREIGN KEY ("process_id") REFERENCES namespace_processes ("process_id")
);

ALTER TABLE default_processes ADD COLUMN IF NOT EXISTS "corrupted_by" BIGINT REFERENCES default_tasks("task_id");
ALTER TABLE namespace_processes ADD COLUMN IF NOT EXISTS "corrupted_by" BIGINT REFERENCES namespace_tasks("task_id");

CREATE TABLE IF NOT EXISTS default_schedule(
CREATE TABLE IF NOT EXISTS namespace_schedule(
"task_id" BIGINT PRIMARY KEY,
"process_id" VARCHAR(80) NOT NULL,
"task_type" task_type NOT NULL,
Expand All @@ -41,11 +43,11 @@ CREATE TABLE IF NOT EXISTS default_schedule(
"last_retry_interval" INTEGER NOT NULL,
"attempts_count" SMALLINT NOT NULL,
"context" BYTEA,
FOREIGN KEY ("process_id") REFERENCES default_processes ("process_id"),
FOREIGN KEY ("task_id") REFERENCES "default_tasks" ("task_id")
FOREIGN KEY ("process_id") REFERENCES namespace_processes ("process_id"),
FOREIGN KEY ("task_id") REFERENCES "namespace_tasks" ("task_id")
);

CREATE TABLE IF NOT EXISTS default_running(
CREATE TABLE IF NOT EXISTS namespace_running(
"process_id" VARCHAR(80) PRIMARY KEY,
"task_id" BIGINT NOT NULL,
"task_type" task_type NOT NULL,
Expand All @@ -57,20 +59,23 @@ CREATE TABLE IF NOT EXISTS default_running(
"last_retry_interval" INTEGER NOT NULL,
"attempts_count" SMALLINT NOT NULL,
"context" BYTEA,
FOREIGN KEY ("process_id") REFERENCES default_processes ("process_id"),
FOREIGN KEY ("task_id") REFERENCES "default_tasks" ("task_id")
FOREIGN KEY ("process_id") REFERENCES namespace_processes ("process_id"),
FOREIGN KEY ("task_id") REFERENCES "namespace_tasks" ("task_id")
);

CREATE TABLE IF NOT EXISTS default_events(
CREATE TABLE IF NOT EXISTS namespace_events(
"process_id" VARCHAR(80) NOT NULL,
"task_id" BIGINT NOT NULL,
"event_id" SMALLINT NOT NULL,
"timestamp" TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
"metadata" JSONB,
"payload" BYTEA NOT NULL,
PRIMARY KEY ("process_id", "event_id"),
FOREIGN KEY ("process_id") REFERENCES default_processes ("process_id"),
FOREIGN KEY ("task_id") REFERENCES default_tasks ("task_id")
FOREIGN KEY ("process_id") REFERENCES namespace_processes ("process_id"),
FOREIGN KEY ("task_id") REFERENCES namespace_tasks ("task_id")
);

CREATE INDEX IF NOT EXISTS "process_idx" on default_events USING HASH ("process_id");
CREATE INDEX IF NOT EXISTS "process_idx" on namespace_events USING HASH ("process_id");
CREATE INDEX IF NOT EXISTS "process_idx" on namespace_tasks USING HASH ("process_id");
CREATE INDEX IF NOT EXISTS "process_idx" on namespace_schedule USING HASH ("process_id");
CREATE INDEX IF NOT EXISTS "task_idx" on namespace_running USING HASH ("task_id");
11 changes: 11 additions & 0 deletions src/prg_storage.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
-export([prepare_call/4]).
-export([prepare_repair/4]).
-export([put_process_data/4]).
-export([process_trace/3]).
-export([get_process_with_running/4]).

%% scan functions
-export([search_timers/4]).
Expand Down Expand Up @@ -64,6 +66,15 @@ get_process_status(#{client := Handler, options := HandlerOpts}, NsId, Id) ->
put_process_data(#{client := Handler, options := HandlerOpts}, NsId, Id, ProcessData) ->
Handler:put_process_data(HandlerOpts, NsId, Id, ProcessData).

-spec process_trace(storage_opts(), namespace_id(), id()) -> {ok, _Result} | {error, _Reason}.
process_trace(#{client := Handler, options := HandlerOpts}, NsId, Id) ->
Handler:process_trace(HandlerOpts, NsId, Id).

-spec get_process_with_running(storage_opts(), namespace_id(), id(), history_range()) ->
{ok, process()} | {error, _Reason}.
get_process_with_running(#{client := Handler, options := HandlerOpts}, NsId, ProcessId, HistoryRange) ->
Handler:get_process_with_running(HandlerOpts, NsId, ProcessId, HistoryRange).

-spec prepare_init(storage_opts(), namespace_id(), id(), task()) ->
{ok, {postpone, task_id()} | {continue, task_id()}} | {error, _Reason}.
prepare_init(#{client := Handler, options := HandlerOpts}, NsId, ProcessId, InitTask) ->
Expand Down
8 changes: 0 additions & 8 deletions src/prg_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,6 @@ handle_result(
Pid, Deadline, NsOpts, extract_task_type(TaskHeader), ProcessId
),
ok = prg_worker_sidecar:event_sink(Pid, Deadline, NsOpts, ProcessId, Events),
%% just for tests
ok = maybe_wait_call(application:get_env(progressor, call_wait_timeout, undefined)),
%%
SaveResult = prg_worker_sidecar:complete_and_continue(
Pid,
Deadline,
Expand Down Expand Up @@ -599,11 +596,6 @@ action_to_task_type(#{remove := true}) ->
action_to_task_type(#{set_timer := _}) ->
<<"timeout">>.

maybe_wait_call(undefined) ->
ok;
maybe_wait_call(Timeout) ->
timer:sleep(Timeout).

last_event_id([]) ->
0;
last_event_id(History) ->
Expand Down
59 changes: 47 additions & 12 deletions src/progressor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
-export([simple_repair/1]).
-export([get/1]).
-export([put/1]).
-export([trace/1]).
-export([health_check/1]).
%% TODO
%% -export([remove/1]).
Expand All @@ -30,7 +31,8 @@
idempotency_key => binary(),
context => binary(),
range => history_range(),
options => map()
options => map(),
_ => _
}.

%% see receive blocks bellow in this module
Expand Down Expand Up @@ -114,6 +116,16 @@ put(Req) ->
Req
).

-spec trace(request()) -> {ok, _Result} | {error, _Reason}.
trace(Req) ->
prg_utils:pipe(
[
fun add_ns_opts/1,
fun do_trace/1
],
Req
).

%% Details term must be json compatible for jsx encode/decode
-spec health_check([namespace_id()]) -> {Status :: passing | critical, Details :: term()}.
health_check(Namespaces) ->
Expand Down Expand Up @@ -268,10 +280,33 @@ await_task_result(StorageOpts, NsId, KeyOrId, Timeout, Duration) ->
)
end.

do_get(
#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId, range := HistoryRange, running_task := TaskId} = Req
) ->
%% retry clause
case prg_storage:get_process_with_running(StorageOpts, NsId, Id, HistoryRange) of
{ok, #{running_task := TaskId}} ->
%% same task_id, await task finished
timer:sleep(1000),
do_get(Req);
Result ->
%% previous task finished, return result
Result
end;
do_get(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId, range := HistoryRange} = Req) ->
prg_storage:get_process(recipient(options(Req)), StorageOpts, NsId, Id, HistoryRange);
do_get(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId} = Req) ->
prg_storage:get_process(recipient(options(Req)), StorageOpts, NsId, Id, #{}).
case prg_storage:get_process_with_running(StorageOpts, NsId, Id, HistoryRange) of
{ok, #{running_task := TaskId}} ->
%% some task running, sleep and retry
timer:sleep(1000),
do_get(Req#{running_task => TaskId});
Result ->
Result
end;
do_get(Req) ->
do_get(Req#{range => #{}}).

do_trace(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId}) ->
prg_storage:process_trace(StorageOpts, NsId, Id).

do_put(
#{
Expand Down Expand Up @@ -439,12 +474,12 @@ maybe_add_key(undefined, _Key, Map) ->
maybe_add_key(Value, Key, Map) ->
Map#{Key => Value}.

options(#{options := Opts}) ->
Opts;
options(_) ->
#{}.
%options(#{options := Opts}) ->
% Opts;
%options(_) ->
% #{}.

recipient(#{cache := ignore}) ->
internal;
recipient(_Req) ->
external.
%recipient(#{cache := ignore}) ->
% internal;
%recipient(_Req) ->
% external.
Loading
Loading