diff --git a/.github/workflows/ci-database-test.yaml b/.github/workflows/ci-database-test.yaml new file mode 100644 index 0000000..d9c8d70 --- /dev/null +++ b/.github/workflows/ci-database-test.yaml @@ -0,0 +1,32 @@ +name: Database Integration Test +on: + pull_request: + push: + branches: + - 'main' + +jobs: + tests: + name: "Database Integration testing" + runs-on: ubuntu-latest + permissions: + # Give the default GITHUB_TOKEN write permission to commit and push the + # added or changed files to the repository. + contents: write + steps: + - uses: actions/checkout@v3 + - name: "Set up database environment" + run: docker compose -f docker/dev-all.yaml up -d + - name: "Test against cassandra" + run: cd server/ && make ci-cassandra-integ-test + - name: "Test against mongodb" + run: cd server/ && make ci-mongodb-integ-test + - name: "Test against mysql" + run: cd server/ && make ci-mysql-integ-test + - name: "Test against postgresql" + run: cd server/ && make ci-postgresql-integ-test + - name: "Test against dynamodb" + run: cd server/ && make ci-dynamodb-integ-test + - name: Dump docker logs + if: always() + uses: jwalton/gh-docker-logs@v2 \ No newline at end of file diff --git a/Makefile b/Makefile index 3f5b15b..60076ae 100644 --- a/Makefile +++ b/Makefile @@ -16,3 +16,4 @@ api-code-gen-server: #generate/refresh go server code for api.yaml, do this afte #api-code-gen-py: #generate/refresh python apis # rm -Rf ./pyapi/* ; true # java -jar openapi-generator-cli-7.14.0.jar generate -i api.yaml -g python -o ./pyapi -p packageVersion=0.0.3 -p packageName=xcherryapi --git-user-id xcherryio --git-repo-id apis + diff --git a/server/Makefile b/server/Makefile index 8208f79..f6fde8b 100644 --- a/server/Makefile +++ b/server/Makefile @@ -102,4 +102,42 @@ help: @echo " run - Build and run the server" @echo " dev - Run with live reload (requires air)" @echo " docker - Build Docker image" - @echo " help - Show this help message" \ No newline at end of file + @echo " help - Show this help message" + + +ci-cassandra-integ-test: + $Q go test -v ./databases/cassandra -cover -coverprofile coverage.cassandra.out -coverpkg ./databases/... + +ci-mongodb-integ-test: + $Q go test -v ./databases/mongodb -cover -coverprofile coverage.mongodb.out -coverpkg ./databases/... + +ci-mysql-integ-test: + $Q go test -v ./databases/mysql -cover -coverprofile coverage.mysql.out -coverpkg ./databases/... + +ci-postgresql-integ-test: + $Q go test -v ./databases/postgresql -cover -coverprofile coverage.postgresql.out -coverpkg ./databases/... + +ci-dynamodb-integ-test: + $Q go test -v ./databases/dynamodb -cover -coverprofile coverage.dynamodb.out -coverpkg ./databases/... + +integ-cassandra-test: + $Q go test -v ./databases/cassandra + +integ-mongodb-test: + $Q go test -v ./databases/mongodb + +integ-mysql-test: + $Q go test -v ./databases/mysql + +integ-postgresql-test: + $Q go test -v ./databases/postgresql + +integ-dynamodb-test: + $Q go test -v ./databases/dynamodb + +integ-test-all: + $Q go test -v ./databases/cassandra + $Q go test -v ./databases/mongodb + $Q go test -v ./databases/mysql + $Q go test -v ./databases/postgresql + $Q go test -v ./databases/dynamodb diff --git a/server/databases/cassandra/cassandra_timer_store_impl.go b/server/databases/cassandra/cassandra_timer_store_impl.go index 55e5a94..178dc3f 100644 --- a/server/databases/cassandra/cassandra_timer_store_impl.go +++ b/server/databases/cassandra/cassandra_timer_store_impl.go @@ -47,7 +47,7 @@ func (c *CassandraTimerStore) ClaimShardOwnership( ctx context.Context, shardId int, ownerId string, metadata interface{}, ) (shardVersion int64, retErr *databases.DbError) { // Convert ZeroUUID to high/low format for shard records - zeroUuidHigh, zeroUuidLow, _ := databases.UuidToHighLow(databases.ZeroUUID) + zeroUuidHigh, zeroUuidLow := databases.UuidToHighLow(databases.ZeroUUID) // Serialize metadata to JSON var metadataJSON string @@ -130,10 +130,10 @@ func (c *CassandraTimerStore) ClaimShardOwnership( func (c *CassandraTimerStore) CreateTimer(ctx context.Context, shardId int, shardVersion int64, namespace string, timer *databases.DbTimer) (err *databases.DbError) { // Convert the provided timer UUID to high/low format for predictable pagination - timerUuidHigh, timerUuidLow, _ := databases.UuidToHighLow(timer.TimerUuid) + timerUuidHigh, timerUuidLow := databases.UuidToHighLow(timer.TimerUuid) // Convert ZeroUUID to high/low format for shard records - zeroUuidHigh, zeroUuidLow, _ := databases.UuidToHighLow(databases.ZeroUUID) + zeroUuidHigh, zeroUuidLow := databases.UuidToHighLow(databases.ZeroUUID) // Serialize payload and retry policy to JSON var payloadJSON, retryPolicyJSON string @@ -210,7 +210,7 @@ func (c *CassandraTimerStore) CreateTimer(ctx context.Context, shardId int, shar func (c *CassandraTimerStore) CreateTimerNoLock(ctx context.Context, shardId int, namespace string, timer *databases.DbTimer) (err *databases.DbError) { // Convert the provided timer UUID to high/low format for predictable pagination - timerUuidHigh, timerUuidLow, _ := databases.UuidToHighLow(timer.TimerUuid) + timerUuidHigh, timerUuidLow := databases.UuidToHighLow(timer.TimerUuid) // Serialize payload and retry policy to JSON var payloadJSON, retryPolicyJSON string diff --git a/server/databases/cassandra/cassandra_timer_store_impl_create_timer_test.go b/server/databases/cassandra/cassandra_timer_store_impl_create_timer_test.go index 6499e11..9a85ec9 100644 --- a/server/databases/cassandra/cassandra_timer_store_impl_create_timer_test.go +++ b/server/databases/cassandra/cassandra_timer_store_impl_create_timer_test.go @@ -2,26 +2,16 @@ package cassandra import ( "context" - "crypto/md5" "fmt" "sync" "testing" "time" - "github.com/google/uuid" "github.com/iworkflowio/durable-timer/databases" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -// generateTimerUUID creates a stable UUID from timer namespace and ID for consistent upsert behavior -func generateTimerUUID(namespace, timerId string) uuid.UUID { - // Create a deterministic UUID based on namespace and timer ID - hash := md5.Sum([]byte(fmt.Sprintf("%s:%s", namespace, timerId))) - uuid, _ := uuid.FromBytes(hash[:]) - return uuid -} - func TestCreateTimer_Basic(t *testing.T) { store, cleanup := setupTestStore(t) defer cleanup() @@ -39,7 +29,7 @@ func TestCreateTimer_Basic(t *testing.T) { // Create a timer timer := &databases.DbTimer{ Id: "timer-1", - TimerUuid: generateTimerUUID(namespace, "timer-1"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-1"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -100,7 +90,7 @@ func TestCreateTimer_WithPayload(t *testing.T) { timer := &databases.DbTimer{ Id: "timer-with-payload", - TimerUuid: generateTimerUUID(namespace, "timer-with-payload"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-with-payload"), Namespace: namespace, ExecuteAt: time.Now().Add(10 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -146,7 +136,7 @@ func TestCreateTimer_WithRetryPolicy(t *testing.T) { timer := &databases.DbTimer{ Id: "timer-with-retry", - TimerUuid: generateTimerUUID(namespace, "timer-with-retry"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-with-retry"), Namespace: namespace, ExecuteAt: time.Now().Add(15 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -183,7 +173,7 @@ func TestCreateTimer_ShardVersionMismatch(t *testing.T) { timer := &databases.DbTimer{ Id: "timer-version-mismatch", - TimerUuid: generateTimerUUID(namespace, "timer-version-mismatch"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-version-mismatch"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -232,7 +222,7 @@ func TestCreateTimer_ConcurrentCreation(t *testing.T) { defer wg.Done() timer := &databases.DbTimer{ Id: fmt.Sprintf("concurrent-timer-%d", idx), - TimerUuid: generateTimerUUID(namespace, fmt.Sprintf("concurrent-timer-%d", idx)), + TimerUuid: databases.GenerateTimerUUID(namespace, fmt.Sprintf("concurrent-timer-%d", idx)), Namespace: namespace, ExecuteAt: time.Now().Add(time.Duration(idx) * time.Minute), CallbackUrl: fmt.Sprintf("https://example.com/callback/%d", idx), @@ -277,7 +267,7 @@ func TestCreateTimerNoLock_Basic(t *testing.T) { // Create a basic timer without needing shard ownership timer := &databases.DbTimer{ Id: "timer-nolock-1", - TimerUuid: generateTimerUUID(namespace, "timer-nolock-1"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-nolock-1"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -334,7 +324,7 @@ func TestCreateTimerNoLock_WithPayload(t *testing.T) { timer := &databases.DbTimer{ Id: "timer-nolock-payload", - TimerUuid: generateTimerUUID(namespace, "timer-nolock-payload"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-nolock-payload"), Namespace: namespace, ExecuteAt: time.Now().Add(10 * time.Minute), CallbackUrl: "https://example.com/nolock/callback", @@ -376,7 +366,7 @@ func TestCreateTimerNoLock_WithRetryPolicy(t *testing.T) { timer := &databases.DbTimer{ Id: "timer-nolock-retry", - TimerUuid: generateTimerUUID(namespace, "timer-nolock-retry"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-nolock-retry"), Namespace: namespace, ExecuteAt: time.Now().Add(15 * time.Minute), CallbackUrl: "https://example.com/nolock/retry", @@ -410,7 +400,7 @@ func TestCreateTimerNoLock_NilPayloadAndRetryPolicy(t *testing.T) { // Create timer with nil payload and retry policy timer := &databases.DbTimer{ Id: "timer-nolock-nil-fields", - TimerUuid: generateTimerUUID(namespace, "timer-nolock-nil-fields"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-nolock-nil-fields"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/nolock/nil", @@ -450,7 +440,7 @@ func TestCreateTimerNoLock_InvalidPayloadSerialization(t *testing.T) { // Create timer with non-serializable payload (function type) timer := &databases.DbTimer{ Id: "timer-nolock-invalid-payload", - TimerUuid: generateTimerUUID(namespace, "timer-nolock-invalid-payload"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-nolock-invalid-payload"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/nolock/invalid", @@ -485,7 +475,7 @@ func TestCreateTimerNoLock_ConcurrentCreation(t *testing.T) { defer wg.Done() timer := &databases.DbTimer{ Id: fmt.Sprintf("concurrent-nolock-timer-%d", idx), - TimerUuid: generateTimerUUID(namespace, fmt.Sprintf("concurrent-nolock-timer-%d", idx)), + TimerUuid: databases.GenerateTimerUUID(namespace, fmt.Sprintf("concurrent-nolock-timer-%d", idx)), Namespace: namespace, ExecuteAt: time.Now().Add(time.Duration(idx) * time.Minute), CallbackUrl: fmt.Sprintf("https://example.com/nolock/callback/%d", idx), @@ -527,8 +517,8 @@ func TestCreateTimer_DuplicateTimerOverwrite(t *testing.T) { shardId := 1 namespace := "test_namespace" timerId := "duplicate-timer" - baseUuid := generateTimerUUID(namespace, timerId) - alternateUuid := generateTimerUUID(namespace, timerId+"_alt") + baseUuid := databases.GenerateTimerUUID(namespace, timerId) + alternateUuid := databases.GenerateTimerUUID(namespace, timerId+"_alt") // Create shard record ownerId := "owner-1" @@ -649,7 +639,7 @@ func TestCreateTimerNoLock_DuplicateTimerOverwrite(t *testing.T) { // Create initial timer originalTimer := &databases.DbTimer{ Id: "duplicate-timer-nolock", - TimerUuid: generateTimerUUID(namespace, "duplicate-timer-nolock"), + TimerUuid: databases.GenerateTimerUUID(namespace, "duplicate-timer-nolock"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://original-nolock.com/callback", @@ -676,8 +666,8 @@ func TestCreateTimerNoLock_DuplicateTimerOverwrite(t *testing.T) { // Create updated timer with same ID (should create new record due to different UUID) updatedTimer := &databases.DbTimer{ - Id: "duplicate-timer-nolock", // Same ID - TimerUuid: generateTimerUUID(namespace, "duplicate-timer-nolock-updated"), // Different UUID + Id: "duplicate-timer-nolock", // Same ID + TimerUuid: databases.GenerateTimerUUID(namespace, "duplicate-timer-nolock-updated"), // Different UUID Namespace: namespace, ExecuteAt: time.Now().Add(15 * time.Minute), // Different execution time CallbackUrl: "https://updated-nolock.com/callback", // Different callback diff --git a/server/databases/dynamodb/dynamodb_timer_store_impl_create_timer_test.go b/server/databases/dynamodb/dynamodb_timer_store_impl_create_timer_test.go index b21eb94..cf3a008 100644 --- a/server/databases/dynamodb/dynamodb_timer_store_impl_create_timer_test.go +++ b/server/databases/dynamodb/dynamodb_timer_store_impl_create_timer_test.go @@ -2,7 +2,6 @@ package dynamodb import ( "context" - "crypto/md5" "fmt" "strconv" "sync" @@ -12,20 +11,11 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" - "github.com/google/uuid" "github.com/iworkflowio/durable-timer/databases" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -// generateTimerUUID creates a stable UUID from timer namespace and ID for consistent upsert behavior -func generateTimerUUID(namespace, timerId string) uuid.UUID { - // Create a deterministic UUID based on namespace and timer ID - hash := md5.Sum([]byte(fmt.Sprintf("%s:%s", namespace, timerId))) - uuid, _ := uuid.FromBytes(hash[:]) - return uuid -} - func TestCreateTimer_Basic(t *testing.T) { store, cleanup := setupTestStore(t) defer cleanup() @@ -43,7 +33,7 @@ func TestCreateTimer_Basic(t *testing.T) { // Create a timer timer := &databases.DbTimer{ Id: "timer-1", - TimerUuid: generateTimerUUID(namespace, "timer-1"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-1"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -100,7 +90,7 @@ func TestCreateTimer_WithPayload(t *testing.T) { timer := &databases.DbTimer{ Id: "timer-with-payload", - TimerUuid: generateTimerUUID(namespace, "timer-with-payload"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-with-payload"), Namespace: namespace, ExecuteAt: time.Now().Add(10 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -157,7 +147,7 @@ func TestCreateTimer_WithRetryPolicy(t *testing.T) { timer := &databases.DbTimer{ Id: "timer-with-retry", - TimerUuid: generateTimerUUID(namespace, "timer-with-retry"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-with-retry"), Namespace: namespace, ExecuteAt: time.Now().Add(15 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -205,7 +195,7 @@ func TestCreateTimer_ShardVersionMismatch(t *testing.T) { timer := &databases.DbTimer{ Id: "timer-version-mismatch", - TimerUuid: generateTimerUUID(namespace, "timer-version-mismatch"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-version-mismatch"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -263,7 +253,7 @@ func TestCreateTimer_ConcurrentCreation(t *testing.T) { defer wg.Done() timer := &databases.DbTimer{ Id: fmt.Sprintf("concurrent-timer-%d", idx), - TimerUuid: generateTimerUUID(namespace, fmt.Sprintf("concurrent-timer-%d", idx)), + TimerUuid: databases.GenerateTimerUUID(namespace, fmt.Sprintf("concurrent-timer-%d", idx)), Namespace: namespace, ExecuteAt: time.Now().Add(time.Duration(idx) * time.Minute), CallbackUrl: fmt.Sprintf("https://example.com/callback/%d", idx), @@ -314,7 +304,7 @@ func TestCreateTimer_NoShardRecord(t *testing.T) { // Don't claim the shard first - no shard record exists timer := &databases.DbTimer{ Id: "timer-no-shard", - TimerUuid: generateTimerUUID(namespace, "timer-no-shard"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-no-shard"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -341,7 +331,7 @@ func TestCreateTimerNoLock_Basic(t *testing.T) { // Create a basic timer without needing shard ownership timer := &databases.DbTimer{ Id: "timer-nolock-1", - TimerUuid: generateTimerUUID(namespace, "timer-nolock-1"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-nolock-1"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -394,7 +384,7 @@ func TestCreateTimerNoLock_WithPayload(t *testing.T) { timer := &databases.DbTimer{ Id: "timer-nolock-payload", - TimerUuid: generateTimerUUID(namespace, "timer-nolock-payload"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-nolock-payload"), Namespace: namespace, ExecuteAt: time.Now().Add(10 * time.Minute), CallbackUrl: "https://example.com/nolock/callback", @@ -447,7 +437,7 @@ func TestCreateTimerNoLock_WithRetryPolicy(t *testing.T) { timer := &databases.DbTimer{ Id: "timer-nolock-retry", - TimerUuid: generateTimerUUID(namespace, "timer-nolock-retry"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-nolock-retry"), Namespace: namespace, ExecuteAt: time.Now().Add(15 * time.Minute), CallbackUrl: "https://example.com/nolock/retry", @@ -492,7 +482,7 @@ func TestCreateTimerNoLock_NilPayloadAndRetryPolicy(t *testing.T) { // Create timer with nil payload and retry policy timer := &databases.DbTimer{ Id: "timer-nolock-nil-fields", - TimerUuid: generateTimerUUID(namespace, "timer-nolock-nil-fields"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-nolock-nil-fields"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/nolock/nil", @@ -540,7 +530,7 @@ func TestCreateTimerNoLock_InvalidPayloadSerialization(t *testing.T) { // Create timer with non-serializable payload (function type) timer := &databases.DbTimer{ Id: "timer-nolock-invalid-payload", - TimerUuid: generateTimerUUID(namespace, "timer-nolock-invalid-payload"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-nolock-invalid-payload"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/nolock/invalid", @@ -575,7 +565,7 @@ func TestCreateTimerNoLock_ConcurrentCreation(t *testing.T) { defer wg.Done() timer := &databases.DbTimer{ Id: fmt.Sprintf("concurrent-nolock-timer-%d", idx), - TimerUuid: generateTimerUUID(namespace, fmt.Sprintf("concurrent-nolock-timer-%d", idx)), + TimerUuid: databases.GenerateTimerUUID(namespace, fmt.Sprintf("concurrent-nolock-timer-%d", idx)), Namespace: namespace, ExecuteAt: time.Now().Add(time.Duration(idx) * time.Minute), CallbackUrl: fmt.Sprintf("https://example.com/nolock/callback/%d", idx), @@ -623,8 +613,8 @@ func TestCreateTimer_DuplicateTimerOverwrite(t *testing.T) { shardId := 1 namespace := "test_namespace" timerId := "duplicate-timer" - baseUuid := generateTimerUUID(namespace, timerId) - alternateUuid := generateTimerUUID(namespace, timerId+"_alt") + baseUuid := databases.GenerateTimerUUID(namespace, timerId) + alternateUuid := databases.GenerateTimerUUID(namespace, timerId+"_alt") // Create shard record ownerId := "owner-1" @@ -751,7 +741,7 @@ func TestCreateTimerNoLock_DuplicateTimerOverwrite(t *testing.T) { // Create initial timer originalTimer := &databases.DbTimer{ Id: "duplicate-timer-nolock", - TimerUuid: generateTimerUUID(namespace, "duplicate-timer-nolock"), + TimerUuid: databases.GenerateTimerUUID(namespace, "duplicate-timer-nolock"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://original-nolock.com/callback", @@ -784,8 +774,8 @@ func TestCreateTimerNoLock_DuplicateTimerOverwrite(t *testing.T) { // Create updated timer with same ID (should overwrite due to DynamoDB's PutItem behavior) updatedTimer := &databases.DbTimer{ - Id: "duplicate-timer-nolock", // Same ID - TimerUuid: generateTimerUUID(namespace, "duplicate-timer-nolock"), // Same UUID + Id: "duplicate-timer-nolock", // Same ID + TimerUuid: databases.GenerateTimerUUID(namespace, "duplicate-timer-nolock"), // Same UUID Namespace: namespace, ExecuteAt: time.Now().Add(15 * time.Minute), // Different execution time CallbackUrl: "https://updated-nolock.com/callback", // Different callback diff --git a/server/databases/mongodb/mongodb_timer_store_impl.go b/server/databases/mongodb/mongodb_timer_store_impl.go index 43040e4..6a81f72 100644 --- a/server/databases/mongodb/mongodb_timer_store_impl.go +++ b/server/databases/mongodb/mongodb_timer_store_impl.go @@ -83,16 +83,18 @@ func (m *MongoDBTimerStore) ClaimShardOwnership( ctx context.Context, shardId int, ownerId string, metadata interface{}, ) (shardVersion int64, retErr *databases.DbError) { // Convert ZeroUUID to high/low format for shard records - zeroUuidHigh, zeroUuidLow, _ := databases.UuidToHighLow(databases.ZeroUUID) + zeroUuidHigh, zeroUuidLow := databases.UuidToHighLow(databases.ZeroUUID) // Serialize metadata to JSON - var metadataJSON string + var metadataJSON interface{} if metadata != nil { metadataBytes, err := json.Marshal(metadata) if err != nil { return 0, databases.NewGenericDbError("failed to marshal metadata", err) } metadataJSON = string(metadataBytes) + } else { + metadataJSON = nil } now := time.Now().UTC() @@ -243,10 +245,10 @@ func isDuplicateKeyError(err error) bool { func (m *MongoDBTimerStore) CreateTimer(ctx context.Context, shardId int, shardVersion int64, namespace string, timer *databases.DbTimer) (err *databases.DbError) { // Convert the provided timer UUID to high/low format for predictable pagination - timerUuidHigh, timerUuidLow, _ := databases.UuidToHighLow(timer.TimerUuid) + timerUuidHigh, timerUuidLow := databases.UuidToHighLow(timer.TimerUuid) // Convert ZeroUUID to high/low format for shard records - zeroUuidHigh, zeroUuidLow, _ := databases.UuidToHighLow(databases.ZeroUUID) + zeroUuidHigh, zeroUuidLow := databases.UuidToHighLow(databases.ZeroUUID) // Serialize payload and retry policy to JSON var payloadJSON, retryPolicyJSON interface{} @@ -381,7 +383,7 @@ func (m *MongoDBTimerStore) buildTimerDocumentForUpsert(shardId int, timer *data func (m *MongoDBTimerStore) CreateTimerNoLock(ctx context.Context, shardId int, namespace string, timer *databases.DbTimer) (err *databases.DbError) { // Convert the provided timer UUID to high/low format for predictable pagination - timerUuidHigh, timerUuidLow, _ := databases.UuidToHighLow(timer.TimerUuid) + timerUuidHigh, timerUuidLow := databases.UuidToHighLow(timer.TimerUuid) // Serialize payload and retry policy to JSON var payloadJSON, retryPolicyJSON interface{} diff --git a/server/databases/mongodb/mongodb_timer_store_impl_claim_test.go b/server/databases/mongodb/mongodb_timer_store_impl_claim_test.go index 0750e17..f673e73 100644 --- a/server/databases/mongodb/mongodb_timer_store_impl_claim_test.go +++ b/server/databases/mongodb/mongodb_timer_store_impl_claim_test.go @@ -31,7 +31,7 @@ func TestClaimShardOwnership_NewShard(t *testing.T) { } // Convert ZeroUUID to high/low format for test queries - zeroUuidHigh, zeroUuidLow, _ := databases.UuidToHighLow(databases.ZeroUUID) + zeroUuidHigh, zeroUuidLow := databases.UuidToHighLow(databases.ZeroUUID) // Claim ownership of a new shard version, err := store.ClaimShardOwnership(ctx, shardId, ownerId, metadata) @@ -56,8 +56,8 @@ func TestClaimShardOwnership_NewShard(t *testing.T) { assert.Equal(t, ownerId, getStringFromBSON(result, "shard_owner_id")) metadataStr := getStringFromBSON(result, "shard_metadata") - assert.Contains(t, metadataStr, "instance-1") assert.Contains(t, metadataStr, "us-west-2") + assert.Contains(t, metadataStr, "us-west-2a") claimedAt := getTimeFromBSON(result, "shard_claimed_at") assert.True(t, time.Since(claimedAt) < 5*time.Second, "claimed_at should be recent") @@ -85,13 +85,16 @@ func TestClaimShardOwnership_ExistingShard(t *testing.T) { assert.Nil(t, err3) assert.Equal(t, int64(3), version3) + // Convert ZeroUUID to high/low format for test queries + zeroUuidHigh, zeroUuidLow := databases.UuidToHighLow(databases.ZeroUUID) + // Verify final state filter := bson.M{ "shard_id": shardId, "row_type": databases.RowTypeShard, "timer_execute_at": databases.ZeroTimestamp, - "timer_uuid_high": int64(0), - "timer_uuid_low": int64(0), + "timer_uuid_high": zeroUuidHigh, + "timer_uuid_low": zeroUuidLow, } var result bson.M @@ -163,13 +166,16 @@ func TestClaimShardOwnership_ConcurrentClaims(t *testing.T) { assert.Greater(t, failureCount, 1, "Should have some failures due to concurrency") assert.Greater(t, maxVersion, int64(0), "Maximum version should be positive") + // Convert ZeroUUID to high/low format for test queries + zeroUuidHigh, zeroUuidLow := databases.UuidToHighLow(databases.ZeroUUID) + // Verify final database state filter := bson.M{ "shard_id": shardId, "row_type": databases.RowTypeShard, "timer_execute_at": databases.ZeroTimestamp, - "timer_uuid_high": int64(0), - "timer_uuid_low": int64(0), + "timer_uuid_high": zeroUuidHigh, + "timer_uuid_low": zeroUuidLow, } var result bson.M @@ -194,13 +200,16 @@ func TestClaimShardOwnership_NilMetadata(t *testing.T) { assert.Nil(t, err) assert.Equal(t, int64(1), version) + // Convert ZeroUUID to high/low format for test queries + zeroUuidHigh, zeroUuidLow := databases.UuidToHighLow(databases.ZeroUUID) + // Verify metadata is empty/null in database filter := bson.M{ "shard_id": shardId, "row_type": databases.RowTypeShard, "timer_execute_at": databases.ZeroTimestamp, - "timer_uuid_high": int64(0), - "timer_uuid_low": int64(0), + "timer_uuid_high": zeroUuidHigh, + "timer_uuid_low": zeroUuidLow, } var result bson.M @@ -241,13 +250,16 @@ func TestClaimShardOwnership_ComplexMetadata(t *testing.T) { assert.Nil(t, err) assert.Equal(t, int64(1), version) + // Convert ZeroUUID to high/low format for test queries + zeroUuidHigh, zeroUuidLow := databases.UuidToHighLow(databases.ZeroUUID) + // Verify metadata is properly serialized filter := bson.M{ "shard_id": shardId, "row_type": databases.RowTypeShard, "timer_execute_at": databases.ZeroTimestamp, - "timer_uuid_high": int64(0), - "timer_uuid_low": int64(0), + "timer_uuid_high": zeroUuidHigh, + "timer_uuid_low": zeroUuidLow, } var result bson.M diff --git a/server/databases/mongodb/mongodb_timer_store_impl_create_timer_test.go b/server/databases/mongodb/mongodb_timer_store_impl_create_timer_test.go index 9531f7f..51f37d4 100644 --- a/server/databases/mongodb/mongodb_timer_store_impl_create_timer_test.go +++ b/server/databases/mongodb/mongodb_timer_store_impl_create_timer_test.go @@ -2,27 +2,17 @@ package mongodb import ( "context" - "crypto/md5" "fmt" "sync" "testing" "time" - "github.com/google/uuid" "github.com/iworkflowio/durable-timer/databases" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.mongodb.org/mongo-driver/bson" ) -// generateTimerUUID creates a stable UUID from timer namespace and ID for consistent upsert behavior -func generateTimerUUID(namespace, timerId string) uuid.UUID { - // Create a deterministic UUID based on namespace and timer ID - hash := md5.Sum([]byte(fmt.Sprintf("%s:%s", namespace, timerId))) - uuid, _ := uuid.FromBytes(hash[:]) - return uuid -} - func TestCreateTimer_Basic(t *testing.T) { store, cleanup := setupTestStore(t) defer cleanup() @@ -40,7 +30,7 @@ func TestCreateTimer_Basic(t *testing.T) { // Create a timer timer := &databases.DbTimer{ Id: "timer-1", - TimerUuid: generateTimerUUID(namespace, "timer-1"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-1"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -93,7 +83,7 @@ func TestCreateTimer_WithPayload(t *testing.T) { timer := &databases.DbTimer{ Id: "timer-with-payload", - TimerUuid: generateTimerUUID(namespace, "timer-with-payload"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-with-payload"), Namespace: namespace, ExecuteAt: time.Now().Add(10 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -146,7 +136,7 @@ func TestCreateTimer_WithRetryPolicy(t *testing.T) { timer := &databases.DbTimer{ Id: "timer-with-retry", - TimerUuid: generateTimerUUID(namespace, "timer-with-retry"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-with-retry"), Namespace: namespace, ExecuteAt: time.Now().Add(15 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -190,7 +180,7 @@ func TestCreateTimer_ShardVersionMismatch(t *testing.T) { timer := &databases.DbTimer{ Id: "timer-version-mismatch", - TimerUuid: generateTimerUUID(namespace, "timer-version-mismatch"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-version-mismatch"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -244,7 +234,7 @@ func TestCreateTimer_ConcurrentCreation(t *testing.T) { defer wg.Done() timer := &databases.DbTimer{ Id: fmt.Sprintf("concurrent-timer-%d", idx), - TimerUuid: generateTimerUUID(namespace, fmt.Sprintf("concurrent-timer-%d", idx)), + TimerUuid: databases.GenerateTimerUUID(namespace, fmt.Sprintf("concurrent-timer-%d", idx)), Namespace: namespace, ExecuteAt: time.Now().Add(time.Duration(idx) * time.Minute), CallbackUrl: fmt.Sprintf("https://example.com/callback/%d", idx), @@ -291,7 +281,7 @@ func TestCreateTimer_NoShardRecord(t *testing.T) { // Don't claim the shard first - no shard record exists timer := &databases.DbTimer{ Id: "timer-no-shard", - TimerUuid: generateTimerUUID(namespace, "timer-no-shard"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-no-shard"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -318,7 +308,7 @@ func TestCreateTimerNoLock_Basic(t *testing.T) { // Create a basic timer without needing shard ownership timer := &databases.DbTimer{ Id: "timer-nolock-1", - TimerUuid: generateTimerUUID(namespace, "timer-nolock-1"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-nolock-1"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -367,7 +357,7 @@ func TestCreateTimerNoLock_WithPayload(t *testing.T) { timer := &databases.DbTimer{ Id: "timer-nolock-payload", - TimerUuid: generateTimerUUID(namespace, "timer-nolock-payload"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-nolock-payload"), Namespace: namespace, ExecuteAt: time.Now().Add(10 * time.Minute), CallbackUrl: "https://example.com/nolock/callback", @@ -416,7 +406,7 @@ func TestCreateTimerNoLock_WithRetryPolicy(t *testing.T) { timer := &databases.DbTimer{ Id: "timer-nolock-retry", - TimerUuid: generateTimerUUID(namespace, "timer-nolock-retry"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-nolock-retry"), Namespace: namespace, ExecuteAt: time.Now().Add(15 * time.Minute), CallbackUrl: "https://example.com/nolock/retry", @@ -457,7 +447,7 @@ func TestCreateTimerNoLock_NilPayloadAndRetryPolicy(t *testing.T) { // Create timer with nil payload and retry policy timer := &databases.DbTimer{ Id: "timer-nolock-nil-fields", - TimerUuid: generateTimerUUID(namespace, "timer-nolock-nil-fields"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-nolock-nil-fields"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/nolock/nil", @@ -501,7 +491,7 @@ func TestCreateTimerNoLock_InvalidPayloadSerialization(t *testing.T) { // Create timer with non-serializable payload (function type) timer := &databases.DbTimer{ Id: "timer-nolock-invalid-payload", - TimerUuid: generateTimerUUID(namespace, "timer-nolock-invalid-payload"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-nolock-invalid-payload"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/nolock/invalid", @@ -536,7 +526,7 @@ func TestCreateTimerNoLock_ConcurrentCreation(t *testing.T) { defer wg.Done() timer := &databases.DbTimer{ Id: fmt.Sprintf("concurrent-nolock-timer-%d", idx), - TimerUuid: generateTimerUUID(namespace, fmt.Sprintf("concurrent-nolock-timer-%d", idx)), + TimerUuid: databases.GenerateTimerUUID(namespace, fmt.Sprintf("concurrent-nolock-timer-%d", idx)), Namespace: namespace, ExecuteAt: time.Now().Add(time.Duration(idx) * time.Minute), CallbackUrl: fmt.Sprintf("https://example.com/nolock/callback/%d", idx), @@ -580,8 +570,8 @@ func TestCreateTimer_DuplicateTimerOverwrite(t *testing.T) { shardId := 1 namespace := "test_namespace" timerId := "duplicate-timer" - baseUuid := generateTimerUUID(namespace, timerId) - alternateUuid := generateTimerUUID(namespace, timerId+"_alt") + baseUuid := databases.GenerateTimerUUID(namespace, timerId) + alternateUuid := databases.GenerateTimerUUID(namespace, timerId+"_alt") // Create shard record ownerId := "owner-1" diff --git a/server/databases/mysql/mysql_timer_store_impl.go b/server/databases/mysql/mysql_timer_store_impl.go index 75d4783..46f42fa 100644 --- a/server/databases/mysql/mysql_timer_store_impl.go +++ b/server/databases/mysql/mysql_timer_store_impl.go @@ -62,7 +62,7 @@ func (m *MySQLTimerStore) ClaimShardOwnership( ctx context.Context, shardId int, ownerId string, metadata interface{}, ) (shardVersion int64, retErr *databases.DbError) { // Convert ZeroUUID to high/low format for shard records - zeroUuidHigh, zeroUuidLow, _ := databases.UuidToHighLow(databases.ZeroUUID) + zeroUuidHigh, zeroUuidLow := databases.UuidToHighLow(databases.ZeroUUID) // Serialize metadata to JSON var metadataJSON interface{} @@ -173,10 +173,10 @@ func isDuplicateKeyError(err error) bool { func (m *MySQLTimerStore) CreateTimer(ctx context.Context, shardId int, shardVersion int64, namespace string, timer *databases.DbTimer) (err *databases.DbError) { // Convert the provided timer UUID to high/low format for predictable pagination - timerUuidHigh, timerUuidLow, _ := databases.UuidToHighLow(timer.TimerUuid) + timerUuidHigh, timerUuidLow := databases.UuidToHighLow(timer.TimerUuid) // Convert ZeroUUID to high/low format for shard records - zeroUuidHigh, zeroUuidLow, _ := databases.UuidToHighLow(databases.ZeroUUID) + zeroUuidHigh, zeroUuidLow := databases.UuidToHighLow(databases.ZeroUUID) // Serialize payload and retry policy to JSON var payloadJSON, retryPolicyJSON interface{} @@ -269,7 +269,7 @@ func (m *MySQLTimerStore) CreateTimer(ctx context.Context, shardId int, shardVer func (m *MySQLTimerStore) CreateTimerNoLock(ctx context.Context, shardId int, namespace string, timer *databases.DbTimer) (err *databases.DbError) { // Convert the provided timer UUID to high/low format for predictable pagination - timerUuidHigh, timerUuidLow, _ := databases.UuidToHighLow(timer.TimerUuid) + timerUuidHigh, timerUuidLow := databases.UuidToHighLow(timer.TimerUuid) // Serialize payload and retry policy to JSON var payloadJSON, retryPolicyJSON interface{} diff --git a/server/databases/mysql/mysql_timer_store_impl_claim_test.go b/server/databases/mysql/mysql_timer_store_impl_claim_test.go index 1452878..9bc6f36 100644 --- a/server/databases/mysql/mysql_timer_store_impl_claim_test.go +++ b/server/databases/mysql/mysql_timer_store_impl_claim_test.go @@ -30,7 +30,7 @@ func TestClaimShardOwnership_NewShard(t *testing.T) { } // Convert ZeroUUID to high/low format for test queries - zeroUuidHigh, zeroUuidLow, _ := databases.UuidToHighLow(databases.ZeroUUID) + zeroUuidHigh, zeroUuidLow := databases.UuidToHighLow(databases.ZeroUUID) // Claim ownership of a new shard version, err := store.ClaimShardOwnership(ctx, shardId, ownerId, metadata) @@ -78,7 +78,7 @@ func TestClaimShardOwnership_ExistingShard(t *testing.T) { assert.Equal(t, int64(3), version3) // Convert ZeroUUID to high/low format for test queries - zeroUuidHigh, zeroUuidLow, _ := databases.UuidToHighLow(databases.ZeroUUID) + zeroUuidHigh, zeroUuidLow := databases.UuidToHighLow(databases.ZeroUUID) // Verify final state var dbVersion int64 @@ -153,7 +153,7 @@ func TestClaimShardOwnership_ConcurrentClaims(t *testing.T) { assert.Greater(t, maxVersion, int64(0), "Maximum version should be positive") // Convert ZeroUUID to high/low format for test queries - zeroUuidHigh, zeroUuidLow, _ := databases.UuidToHighLow(databases.ZeroUUID) + zeroUuidHigh, zeroUuidLow := databases.UuidToHighLow(databases.ZeroUUID) // Verify final database state var dbVersion int64 @@ -181,7 +181,7 @@ func TestClaimShardOwnership_NilMetadata(t *testing.T) { assert.Equal(t, int64(1), version) // Convert ZeroUUID to high/low format for test queries - zeroUuidHigh, zeroUuidLow, _ := databases.UuidToHighLow(databases.ZeroUUID) + zeroUuidHigh, zeroUuidLow := databases.UuidToHighLow(databases.ZeroUUID) // Verify metadata is empty/null in database var dbMetadata *string @@ -225,7 +225,7 @@ func TestClaimShardOwnership_ComplexMetadata(t *testing.T) { assert.Equal(t, int64(1), version) // Convert ZeroUUID to high/low format for test queries - zeroUuidHigh, zeroUuidLow, _ := databases.UuidToHighLow(databases.ZeroUUID) + zeroUuidHigh, zeroUuidLow := databases.UuidToHighLow(databases.ZeroUUID) // Verify metadata is properly serialized var dbMetadata string diff --git a/server/databases/mysql/mysql_timer_store_impl_create_timer_test.go b/server/databases/mysql/mysql_timer_store_impl_create_timer_test.go index da40374..32de119 100644 --- a/server/databases/mysql/mysql_timer_store_impl_create_timer_test.go +++ b/server/databases/mysql/mysql_timer_store_impl_create_timer_test.go @@ -2,27 +2,17 @@ package mysql import ( "context" - "crypto/md5" "encoding/json" "fmt" "sync" "testing" "time" - "github.com/google/uuid" "github.com/iworkflowio/durable-timer/databases" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -// generateTimerUUID creates a stable UUID from timer namespace and ID for consistent upsert behavior -func generateTimerUUID(namespace, timerId string) uuid.UUID { - // Create a deterministic UUID based on namespace and timer ID - hash := md5.Sum([]byte(fmt.Sprintf("%s:%s", namespace, timerId))) - uuid, _ := uuid.FromBytes(hash[:]) - return uuid -} - func TestCreateTimer_Basic(t *testing.T) { store, cleanup := setupTestStore(t) defer cleanup() @@ -40,7 +30,7 @@ func TestCreateTimer_Basic(t *testing.T) { // Create a timer timer := &databases.DbTimer{ Id: "timer-1", - TimerUuid: generateTimerUUID(namespace, "timer-1"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-1"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -93,7 +83,7 @@ func TestCreateTimer_WithPayload(t *testing.T) { } timer := &databases.DbTimer{ Id: "timer-with-payload", - TimerUuid: generateTimerUUID(namespace, "timer-with-payload"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-with-payload"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -148,7 +138,7 @@ func TestCreateTimer_WithRetryPolicy(t *testing.T) { timer := &databases.DbTimer{ Id: "timer-with-retry", - TimerUuid: generateTimerUUID(namespace, "timer-with-retry"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-with-retry"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -195,7 +185,7 @@ func TestCreateTimer_ShardVersionMismatch(t *testing.T) { // Create timer with wrong shard version timer := &databases.DbTimer{ Id: "timer-version-mismatch", - TimerUuid: generateTimerUUID(namespace, "timer-version-mismatch"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-version-mismatch"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -250,7 +240,7 @@ func TestCreateTimer_ConcurrentCreation(t *testing.T) { timer := &databases.DbTimer{ Id: fmt.Sprintf("concurrent-timer-%d", index), - TimerUuid: generateTimerUUID(namespace, fmt.Sprintf("concurrent-timer-%d", index)), + TimerUuid: databases.GenerateTimerUUID(namespace, fmt.Sprintf("concurrent-timer-%d", index)), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: fmt.Sprintf("https://example.com/callback/%d", index), @@ -300,7 +290,7 @@ func TestCreateTimer_NoShardRecord(t *testing.T) { timer := &databases.DbTimer{ Id: "timer-no-shard", - TimerUuid: generateTimerUUID(namespace, "timer-no-shard"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-no-shard"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -335,8 +325,8 @@ func TestCreateTimer_DuplicateTimerOverwrite(t *testing.T) { shardId := 1 namespace := "test_namespace" timerId := "duplicate-timer" - baseUuid := generateTimerUUID(namespace, timerId) - alternateUuid := generateTimerUUID(namespace, timerId+"_alt") + baseUuid := databases.GenerateTimerUUID(namespace, timerId) + alternateUuid := databases.GenerateTimerUUID(namespace, timerId+"_alt") // Create shard record ownerId := "owner-1" @@ -450,7 +440,7 @@ func TestCreateTimerNoLock_Basic(t *testing.T) { timer := &databases.DbTimer{ Id: "timer-no-lock", - TimerUuid: generateTimerUUID(namespace, "timer-no-lock"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-no-lock"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -487,7 +477,7 @@ func TestCreateTimerNoLock_WithPayload(t *testing.T) { timer := &databases.DbTimer{ Id: "timer-no-lock-payload", - TimerUuid: generateTimerUUID(namespace, "timer-no-lock-payload"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-no-lock-payload"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -526,7 +516,7 @@ func TestCreateTimerNoLock_WithRetryPolicy(t *testing.T) { timer := &databases.DbTimer{ Id: "timer-no-lock-retry", - TimerUuid: generateTimerUUID(namespace, "timer-no-lock-retry"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-no-lock-retry"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -560,7 +550,7 @@ func TestCreateTimerNoLock_NilPayloadAndRetryPolicy(t *testing.T) { timer := &databases.DbTimer{ Id: "timer-no-lock-nil", - TimerUuid: generateTimerUUID(namespace, "timer-no-lock-nil"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-no-lock-nil"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -602,7 +592,7 @@ func TestCreateTimerNoLock_InvalidPayloadSerialization(t *testing.T) { timer := &databases.DbTimer{ Id: "timer-invalid-payload", - TimerUuid: generateTimerUUID(namespace, "timer-invalid-payload"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-invalid-payload"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -630,7 +620,7 @@ func TestCreateTimerNoLock_DuplicateTimerOverwrite(t *testing.T) { // Create initial timer originalTimer := &databases.DbTimer{ Id: "duplicate-timer-nolock", - TimerUuid: generateTimerUUID(namespace, "duplicate-timer-nolock"), + TimerUuid: databases.GenerateTimerUUID(namespace, "duplicate-timer-nolock"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://original-nolock.com/callback", @@ -657,8 +647,8 @@ func TestCreateTimerNoLock_DuplicateTimerOverwrite(t *testing.T) { // Create updated timer with same ID (should overwrite) updatedTimer := &databases.DbTimer{ - Id: "duplicate-timer-nolock", // Same ID - TimerUuid: generateTimerUUID(namespace, "duplicate-timer-nolock"), // Same UUID + Id: "duplicate-timer-nolock", // Same ID + TimerUuid: databases.GenerateTimerUUID(namespace, "duplicate-timer-nolock"), // Same UUID Namespace: namespace, ExecuteAt: time.Now().Add(15 * time.Minute), // Different execution time CallbackUrl: "https://updated-nolock.com/callback", // Different callback @@ -713,7 +703,7 @@ func TestCreateTimerNoLock_ConcurrentCreation(t *testing.T) { timer := &databases.DbTimer{ Id: fmt.Sprintf("concurrent-nolock-timer-%d", index), - TimerUuid: generateTimerUUID(namespace, fmt.Sprintf("concurrent-nolock-timer-%d", index)), + TimerUuid: databases.GenerateTimerUUID(namespace, fmt.Sprintf("concurrent-nolock-timer-%d", index)), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: fmt.Sprintf("https://example.com/callback/%d", index), diff --git a/server/databases/postgresql/postgresql_timer_store_impl.go b/server/databases/postgresql/postgresql_timer_store_impl.go index 0d65d6a..50196b2 100644 --- a/server/databases/postgresql/postgresql_timer_store_impl.go +++ b/server/databases/postgresql/postgresql_timer_store_impl.go @@ -60,7 +60,7 @@ func (p *PostgreSQLTimerStore) ClaimShardOwnership( ctx context.Context, shardId int, ownerId string, metadata interface{}, ) (shardVersion int64, retErr *databases.DbError) { // Convert ZeroUUID to high/low format for shard records - zeroUuidHigh, zeroUuidLow, _ := databases.UuidToHighLow(databases.ZeroUUID) + zeroUuidHigh, zeroUuidLow := databases.UuidToHighLow(databases.ZeroUUID) // Serialize metadata to JSON var metadataJSON interface{} @@ -170,10 +170,10 @@ func isDuplicateKeyError(err error) bool { func (p *PostgreSQLTimerStore) CreateTimer(ctx context.Context, shardId int, shardVersion int64, namespace string, timer *databases.DbTimer) (err *databases.DbError) { // Convert the provided timer UUID to high/low format for predictable pagination - timerUuidHigh, timerUuidLow, _ := databases.UuidToHighLow(timer.TimerUuid) + timerUuidHigh, timerUuidLow := databases.UuidToHighLow(timer.TimerUuid) // Convert ZeroUUID to high/low format for shard records - zeroUuidHigh, zeroUuidLow, _ := databases.UuidToHighLow(databases.ZeroUUID) + zeroUuidHigh, zeroUuidLow := databases.UuidToHighLow(databases.ZeroUUID) // Serialize payload and retry policy to JSON var payloadJSON, retryPolicyJSON interface{} @@ -266,7 +266,7 @@ func (p *PostgreSQLTimerStore) CreateTimer(ctx context.Context, shardId int, sha func (p *PostgreSQLTimerStore) CreateTimerNoLock(ctx context.Context, shardId int, namespace string, timer *databases.DbTimer) (err *databases.DbError) { // Convert the provided timer UUID to high/low format for predictable pagination - timerUuidHigh, timerUuidLow, _ := databases.UuidToHighLow(timer.TimerUuid) + timerUuidHigh, timerUuidLow := databases.UuidToHighLow(timer.TimerUuid) // Serialize payload and retry policy to JSON var payloadJSON, retryPolicyJSON interface{} diff --git a/server/databases/postgresql/postgresql_timer_store_impl_claim_test.go b/server/databases/postgresql/postgresql_timer_store_impl_claim_test.go index db273e1..c80b8dc 100644 --- a/server/databases/postgresql/postgresql_timer_store_impl_claim_test.go +++ b/server/databases/postgresql/postgresql_timer_store_impl_claim_test.go @@ -30,7 +30,7 @@ func TestClaimShardOwnership_NewShard(t *testing.T) { } // Convert ZeroUUID to high/low format for test queries - zeroUuidHigh, zeroUuidLow, _ := databases.UuidToHighLow(databases.ZeroUUID) + zeroUuidHigh, zeroUuidLow := databases.UuidToHighLow(databases.ZeroUUID) // Claim ownership of a new shard version, err := store.ClaimShardOwnership(ctx, shardId, ownerId, metadata) @@ -78,7 +78,7 @@ func TestClaimShardOwnership_ExistingShard(t *testing.T) { assert.Equal(t, int64(3), version3) // Convert ZeroUUID to high/low format for test queries - zeroUuidHigh, zeroUuidLow, _ := databases.UuidToHighLow(databases.ZeroUUID) + zeroUuidHigh, zeroUuidLow := databases.UuidToHighLow(databases.ZeroUUID) // Verify final state var dbVersion int64 @@ -153,7 +153,7 @@ func TestClaimShardOwnership_ConcurrentClaims(t *testing.T) { assert.Greater(t, maxVersion, int64(0), "Maximum version should be positive") // Convert ZeroUUID to high/low format for test queries - zeroUuidHigh, zeroUuidLow, _ := databases.UuidToHighLow(databases.ZeroUUID) + zeroUuidHigh, zeroUuidLow := databases.UuidToHighLow(databases.ZeroUUID) // Verify final database state var dbVersion int64 @@ -181,7 +181,7 @@ func TestClaimShardOwnership_NilMetadata(t *testing.T) { assert.Equal(t, int64(1), version) // Convert ZeroUUID to high/low format for test queries - zeroUuidHigh, zeroUuidLow, _ := databases.UuidToHighLow(databases.ZeroUUID) + zeroUuidHigh, zeroUuidLow := databases.UuidToHighLow(databases.ZeroUUID) // Verify metadata is empty/null in database var dbMetadata *string @@ -223,7 +223,7 @@ func TestClaimShardOwnership_ComplexMetadata(t *testing.T) { assert.Equal(t, int64(1), version) // Convert ZeroUUID to high/low format for test queries - zeroUuidHigh, zeroUuidLow, _ := databases.UuidToHighLow(databases.ZeroUUID) + zeroUuidHigh, zeroUuidLow := databases.UuidToHighLow(databases.ZeroUUID) // Verify metadata is properly serialized var dbMetadata string diff --git a/server/databases/postgresql/postgresql_timer_store_impl_create_timer_test.go b/server/databases/postgresql/postgresql_timer_store_impl_create_timer_test.go index 8e6bcc8..66ad938 100644 --- a/server/databases/postgresql/postgresql_timer_store_impl_create_timer_test.go +++ b/server/databases/postgresql/postgresql_timer_store_impl_create_timer_test.go @@ -2,27 +2,17 @@ package postgresql import ( "context" - "crypto/md5" "encoding/json" "fmt" "sync" "testing" "time" - "github.com/google/uuid" "github.com/iworkflowio/durable-timer/databases" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -// generateTimerUUID creates a stable UUID from timer namespace and ID for consistent upsert behavior -func generateTimerUUID(namespace, timerId string) uuid.UUID { - // Create a deterministic UUID based on namespace and timer ID - hash := md5.Sum([]byte(fmt.Sprintf("%s:%s", namespace, timerId))) - uuid, _ := uuid.FromBytes(hash[:]) - return uuid -} - func TestCreateTimer_Basic(t *testing.T) { store, cleanup := setupTestStore(t) defer cleanup() @@ -40,7 +30,7 @@ func TestCreateTimer_Basic(t *testing.T) { // Create a timer timer := &databases.DbTimer{ Id: "timer-1", - TimerUuid: generateTimerUUID(namespace, "timer-1"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-1"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -93,7 +83,7 @@ func TestCreateTimer_WithPayload(t *testing.T) { } timer := &databases.DbTimer{ Id: "timer-with-payload", - TimerUuid: generateTimerUUID(namespace, "timer-with-payload"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-with-payload"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -148,7 +138,7 @@ func TestCreateTimer_WithRetryPolicy(t *testing.T) { timer := &databases.DbTimer{ Id: "timer-with-retry", - TimerUuid: generateTimerUUID(namespace, "timer-with-retry"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-with-retry"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -195,7 +185,7 @@ func TestCreateTimer_ShardVersionMismatch(t *testing.T) { // Create timer with wrong shard version timer := &databases.DbTimer{ Id: "timer-version-mismatch", - TimerUuid: generateTimerUUID(namespace, "timer-version-mismatch"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-version-mismatch"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -250,7 +240,7 @@ func TestCreateTimer_ConcurrentCreation(t *testing.T) { timer := &databases.DbTimer{ Id: fmt.Sprintf("concurrent-timer-%d", index), - TimerUuid: generateTimerUUID(namespace, fmt.Sprintf("concurrent-timer-%d", index)), + TimerUuid: databases.GenerateTimerUUID(namespace, fmt.Sprintf("concurrent-timer-%d", index)), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: fmt.Sprintf("https://example.com/callback/%d", index), @@ -300,7 +290,7 @@ func TestCreateTimer_NoShardRecord(t *testing.T) { timer := &databases.DbTimer{ Id: "timer-no-shard", - TimerUuid: generateTimerUUID(namespace, "timer-no-shard"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-no-shard"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -335,8 +325,8 @@ func TestCreateTimer_DuplicateTimerOverwrite(t *testing.T) { shardId := 1 namespace := "test_namespace" timerId := "duplicate-timer" - baseUuid := generateTimerUUID(namespace, timerId) - alternateUuid := generateTimerUUID(namespace, timerId+"_alt") + baseUuid := databases.GenerateTimerUUID(namespace, timerId) + alternateUuid := databases.GenerateTimerUUID(namespace, timerId+"_alt") // Create shard record ownerId := "owner-1" @@ -450,7 +440,7 @@ func TestCreateTimerNoLock_Basic(t *testing.T) { timer := &databases.DbTimer{ Id: "timer-no-lock", - TimerUuid: generateTimerUUID(namespace, "timer-no-lock"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-no-lock"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -487,7 +477,7 @@ func TestCreateTimerNoLock_WithPayload(t *testing.T) { timer := &databases.DbTimer{ Id: "timer-no-lock-payload", - TimerUuid: generateTimerUUID(namespace, "timer-no-lock-payload"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-no-lock-payload"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -526,7 +516,7 @@ func TestCreateTimerNoLock_WithRetryPolicy(t *testing.T) { timer := &databases.DbTimer{ Id: "timer-no-lock-retry", - TimerUuid: generateTimerUUID(namespace, "timer-no-lock-retry"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-no-lock-retry"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -560,7 +550,7 @@ func TestCreateTimerNoLock_NilPayloadAndRetryPolicy(t *testing.T) { timer := &databases.DbTimer{ Id: "timer-no-lock-nil", - TimerUuid: generateTimerUUID(namespace, "timer-no-lock-nil"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-no-lock-nil"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -602,7 +592,7 @@ func TestCreateTimerNoLock_InvalidPayloadSerialization(t *testing.T) { timer := &databases.DbTimer{ Id: "timer-invalid-payload", - TimerUuid: generateTimerUUID(namespace, "timer-invalid-payload"), + TimerUuid: databases.GenerateTimerUUID(namespace, "timer-invalid-payload"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://example.com/callback", @@ -630,7 +620,7 @@ func TestCreateTimerNoLock_DuplicateTimerOverwrite(t *testing.T) { // Create initial timer originalTimer := &databases.DbTimer{ Id: "duplicate-timer-nolock", - TimerUuid: generateTimerUUID(namespace, "duplicate-timer-nolock"), + TimerUuid: databases.GenerateTimerUUID(namespace, "duplicate-timer-nolock"), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: "https://original-nolock.com/callback", @@ -657,8 +647,8 @@ func TestCreateTimerNoLock_DuplicateTimerOverwrite(t *testing.T) { // Create updated timer with same ID (should overwrite) updatedTimer := &databases.DbTimer{ - Id: "duplicate-timer-nolock", // Same ID - TimerUuid: generateTimerUUID(namespace, "duplicate-timer-nolock"), // Same stable UUID + Id: "duplicate-timer-nolock", // Same ID + TimerUuid: databases.GenerateTimerUUID(namespace, "duplicate-timer-nolock"), // Same stable UUID Namespace: namespace, ExecuteAt: time.Now().Add(15 * time.Minute), // Different execution time CallbackUrl: "https://updated-nolock.com/callback", // Different callback @@ -713,7 +703,7 @@ func TestCreateTimerNoLock_ConcurrentCreation(t *testing.T) { timer := &databases.DbTimer{ Id: fmt.Sprintf("concurrent-nolock-timer-%d", index), - TimerUuid: generateTimerUUID(namespace, fmt.Sprintf("concurrent-nolock-timer-%d", index)), + TimerUuid: databases.GenerateTimerUUID(namespace, fmt.Sprintf("concurrent-nolock-timer-%d", index)), Namespace: namespace, ExecuteAt: time.Now().Add(5 * time.Minute), CallbackUrl: fmt.Sprintf("https://example.com/callback/%d", index), diff --git a/server/databases/timer_uuid.go b/server/databases/timer_uuid.go index 2349903..da4a412 100644 --- a/server/databases/timer_uuid.go +++ b/server/databases/timer_uuid.go @@ -1,19 +1,21 @@ package databases import ( + "crypto/md5" "encoding/binary" "fmt" - "github.com/google/uuid" "time" + + "github.com/google/uuid" ) -// UuidToHighLow converts a UUID string to high and low 64-bit integers +// UuidToHighLow converts a UUID to high and low 64-bit integers // for predictable pagination ordering across databases. -func UuidToHighLow(uuid uuid.UUID) (high, low int64, err error) { +func UuidToHighLow(uuid uuid.UUID) (high, low int64) { high = int64(binary.BigEndian.Uint64(uuid[0:8])) low = int64(binary.BigEndian.Uint64(uuid[8:16])) - return high, low, nil + return high, low } // HighLowToUuid converts high and low 64-bit integers back to a UUID string @@ -25,6 +27,14 @@ func HighLowToUuid(high, low int64) uuid.UUID { return uuid } +// GenerateTimerUUID creates a stable UUID from timer namespace and ID for consistent upsert behavior +func GenerateTimerUUID(namespace, timerId string) uuid.UUID { + // Create a deterministic UUID based on namespace and timer ID + hash := md5.Sum([]byte(fmt.Sprintf("%s:%s", namespace, timerId))) + uuid, _ := uuid.FromBytes(hash[:]) + return uuid +} + // FormatExecuteAtWithUuid creates a composite field for DynamoDB pagination // Format: "2025-01-01T10:00:00.000Z#550e8400-e29b-41d4-a716-446655440000" func FormatExecuteAtWithUuid(executeAt time.Time, uuidStr string) string {