Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions EventSourcing.Core/Records/Event.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public record Event : Record
/// <summary>
/// The index of this <see cref="Event"/> with respect to <see cref="Record.AggregateId"/>
/// </summary>
[JsonPropertyOrder(-2)]
public long Index { get; init; }

/// <summary>
Expand Down
35 changes: 21 additions & 14 deletions EventSourcing.Core/Records/Record.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,10 @@ public enum RecordKind
/// </summary>
public abstract record Record
{
/// <summary>
/// <see cref="RecordKind"/> of this <see cref="Record"/>.
/// </summary>
/// <remarks>
/// Used to differentiate between <see cref="Record"/> kinds in database queries
/// </remarks>
public RecordKind Kind => this switch
{
Projection => RecordKind.Projection,
Snapshot => RecordKind.Snapshot,
Event => RecordKind.Event,
_ => RecordKind.None
};

/// <summary>
/// String representation of Record Type. Defaults to <c>GetType().Name</c>
/// </summary>
[JsonPropertyOrder(-8)]
public string Type { get; init; }

/// <summary>
Expand All @@ -58,8 +45,24 @@ public abstract record Record
/// Set to <see cref="Aggregate{TAggregate}"/>.<see cref="Aggregate{TAggregate}.Type"/> when <see cref="Event"/> is added to an Aggregate.
/// </para>
/// </remarks>
[JsonPropertyOrder(-7)]
public string? AggregateType { get; init; }

/// <summary>
/// <see cref="RecordKind"/> of this <see cref="Record"/>.
/// </summary>
/// <remarks>
/// Used to differentiate between <see cref="Record"/> kinds in database queries
/// </remarks>
[JsonPropertyOrder(-6)]
public RecordKind Kind => this switch
{
Projection => RecordKind.Projection,
Snapshot => RecordKind.Snapshot,
Event => RecordKind.Event,
_ => RecordKind.None
};

/// <summary>
/// Unique Partition identifier.
/// </summary>
Expand All @@ -76,6 +79,7 @@ public abstract record Record
/// i.e. no transactions involving multiple <see cref="PartitionId"/>'s can be committed.
/// </para>
/// </remarks>
[JsonPropertyOrder(-5)]
public Guid PartitionId { get; init; }

/// <summary>
Expand All @@ -86,16 +90,19 @@ public abstract record Record
/// Set to <see cref="Aggregate{TAggregate}"/>.<see cref="Aggregate{TAggregate}.Id"/> when <see cref="Event"/> is added to an Aggregate.
/// </para>
/// </remarks>
[JsonPropertyOrder(-4)]
public Guid AggregateId { get; init; }

/// <summary>
/// Record creation/update time. Defaults to <see cref="DateTimeOffset"/>.<see cref="DateTimeOffset.UtcNow"/> on creation.
/// </summary>
[JsonPropertyOrder(-3)]
public DateTimeOffset Timestamp { get; init; }

/// <summary>
/// Unique Database identifier.
/// </summary>
[JsonPropertyOrder(-1)]
public abstract string id { get; }

/// <summary>
Expand Down
1 change: 1 addition & 0 deletions EventSourcing.Core/Records/Snapshot.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public record Snapshot : Record
/// <summary>
/// The index of this <see cref="Event"/> with respect to <see cref="Record.AggregateId"/>
/// </summary>
[JsonPropertyOrder(-2)]
public long Index { get; init; }

/// <summary>
Expand Down
31 changes: 5 additions & 26 deletions EventSourcing.Cosmos/RecordConverter/RecordConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,32 +49,11 @@ public override TRecord Read(ref Utf8JsonReader reader, Type typeToConvert, Json

private Type DeserializeRecordType(Utf8JsonReader reader)
{
var json = JsonSerializer.Deserialize<Dictionary<string, JsonElement>>(ref reader);

// Get Record.Type String from Json
if (json == null || !json.TryGetValue("Type", out var typeString) || typeString.ValueKind != JsonValueKind.String)
if (reader.TokenType == JsonTokenType.StartObject &&
reader.Read() && reader.TokenType == JsonTokenType.PropertyName && reader.GetString() == nameof(Record.Type) &&
reader.Read() && reader.TokenType == JsonTokenType.String)
return _recordTypeCache.GetRecordType(reader.GetString()!);

// Throw Exception when json has no "Type" Property
throw new RecordValidationException(
$"Error converting {typeof(TRecord)}. " +
$"Couldn't parse {typeof(TRecord)}.Type string from Json. " +
$"Does the Json contain a {nameof(Record.Type)} field?");

var type = _recordTypeCache.GetRecordType(typeString.GetString()!);

if (!_throwOnMissingNonNullableProperties) return type;

var missing = _recordTypeCache.GetNonNullableRecordProperties(type)
.Where(property => !json.TryGetValue(property.Name, out var value) || value.ValueKind == JsonValueKind.Null)
.Select(property => property.Name)
.ToList();

if (missing.Count > 0)
throw new RecordValidationException(
$"Error converting Json to {type}'.\n" +
$"One ore more non-nullable properties are missing or null: {string.Join(", ", missing.Select(property => $"{type.Name}.{property}"))}.\n" +
$"Either make properties nullable or use a RecordMigrator to handle {typeof(TRecord)} versioning.");

return type;
throw new JsonException("Could not deserialize Record Type");
}
}
20 changes: 20 additions & 0 deletions EventSourcing.EF.SqlAggregate/EventSourcing.EF.SqlAggregate.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\EventSourcing.Core\EventSourcing.Core.csproj" />
<ProjectReference Include="..\EventSourcing.EF\EventSourcing.EF.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.CodeAnalysis.CSharp" Version="4.2.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="6.0.5" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="6.0.4" />
</ItemGroup>

</Project>
9 changes: 9 additions & 0 deletions EventSourcing.EF.SqlAggregate/MigrationBuilderExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using Finaps.EventSourcing.EF.SqlAggregate;

namespace Microsoft.EntityFrameworkCore.Migrations;

public static class MigrationBuilderExtensions
{
public static void CreateSqlAggregate(this MigrationBuilder builder, string? assemblyQualifiedName) =>
builder.Operations.Add(new AddSqlAggregateOperation(assemblyQualifiedName));
}
10 changes: 10 additions & 0 deletions EventSourcing.EF.SqlAggregate/ModelBuilderExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using Finaps.EventSourcing.Core;
using Microsoft.EntityFrameworkCore;

namespace EventSourcing.EF.SqlAggregate;

public static class ModelBuilderExtensions
{
public static SqlAggregateBuilder<TAggregate, TSqlAggregate> Aggregate<TAggregate, TSqlAggregate>(this ModelBuilder builder)
where TAggregate : Aggregate, new() where TSqlAggregate : SQLAggregate, new() => new(builder);
}
14 changes: 14 additions & 0 deletions EventSourcing.EF.SqlAggregate/RecordContextExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using Finaps.EventSourcing.Core;
using Finaps.EventSourcing.EF;
using Microsoft.EntityFrameworkCore;

namespace EventSourcing.EF.SqlAggregate;

public static class RecordContextExtensions
{
public static IQueryable<TSqlAggregate> Aggregate<TAggregate, TSqlAggregate>(this RecordContext context)
where TAggregate : Aggregate, new() where TSqlAggregate : SQLAggregate, new() => context.Set<TSqlAggregate>().FromSqlRaw(
$@"SELECT ({typeof(TAggregate).Name}{typeof(TSqlAggregate).Name}Aggregate(e ORDER BY ""{nameof(Event.Index)}"")).*
FROM ""{typeof(TAggregate).EventTable()}"" AS e
GROUP BY ""{nameof(SQLAggregate.AggregateId)}""");
}
8 changes: 8 additions & 0 deletions EventSourcing.EF.SqlAggregate/SqlAggregate.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace EventSourcing.EF.SqlAggregate;

public abstract record SQLAggregate
{
public Guid PartitionId { get; init; }
public Guid AggregateId { get; init; }
public long Version { get; init; }
}
175 changes: 175 additions & 0 deletions EventSourcing.EF.SqlAggregate/SqlAggregateBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
using System.Collections;
using System.Collections.Specialized;
using System.Linq.Expressions;
using System.Net;
using System.Net.NetworkInformation;
using System.Numerics;
using System.Reflection;
using System.Text.Json;
using Finaps.EventSourcing.Core;
using Finaps.EventSourcing.EF;
using Finaps.EventSourcing.EF.SqlAggregate;
using Microsoft.EntityFrameworkCore;
using NpgsqlTypes;

namespace EventSourcing.EF.SqlAggregate;


public abstract class SqlAggregateBuilder
{
internal static Dictionary<Type, Dictionary<Type, SqlAggregateBuilder>> Cache { get; } = new();

public abstract string SQL { get; }
}

public class SqlAggregateBuilder<TAggregate, TSqlAggregate> : SqlAggregateBuilder
where TAggregate : Aggregate, new()
where TSqlAggregate : SQLAggregate, new()
{
public override string SQL => $"{ApplyFunctionDefinition}\n{AggregateFunctionDefinition}";

private List<LambdaExpression> Clauses { get; } = new();

private static string EventTableName => typeof(TAggregate).EventTable();
private static string ApplyFunctionName => $"{typeof(TAggregate).Name}{typeof(TSqlAggregate).Name}Apply";
private static string AggregateFunctionName => $"{typeof(TAggregate).Name}{typeof(TSqlAggregate).Name}Aggregate";
private static string AggregateFunctionDefinition =>
$"CREATE AGGREGATE {AggregateFunctionName}(\"{EventTableName}\")\n" +
"(\n" +
$" sfunc = {ApplyFunctionName},\n" +
$" stype = \"{typeof(TSqlAggregate).Name}\",\n" +
$" initcond = '({string.Join(",", ConvertDefaultPropertyValues())})'\n" +
");";
private string ApplyFunctionDefinition =>
$"CREATE FUNCTION {ApplyFunctionName}({AggregateToken} \"{typeof(TSqlAggregate).Name}\", {EventToken} \"{EventTableName}\") " +
$"RETURNS \"{typeof(TSqlAggregate).Name}\"\n" +
$"RETURN CASE\n{string.Join("\n", Clauses.Select(ConvertClause))}\nELSE {AggregateToken}\nEND;";

private static IEnumerable<PropertyInfo> Properties => typeof(TSqlAggregate).GetProperties().OrderBy(x => x.Name);
private const string AggregateToken = "aggregate";
private const string EventToken = "event";

public SqlAggregateBuilder(ModelBuilder builder)
{
if (!Cache.ContainsKey(typeof(TSqlAggregate)))
Cache.Add(typeof(TSqlAggregate), new Dictionary<Type, SqlAggregateBuilder>());

Cache[typeof(TSqlAggregate)].TryAdd(typeof(TAggregate), this);

builder.Entity<TSqlAggregate>()
.HasKey(x => new { x.PartitionId, x.AggregateId });

foreach (var (property, i) in typeof(TSqlAggregate).GetProperties().OrderBy(x => x.Name).Select((info, i) => (info, i)))
builder.Entity<TSqlAggregate>()
.Property(property.Name).HasColumnOrder(i);
}

public SqlAggregateBuilder<TAggregate, TSqlAggregate> Apply<TEvent>(
Expression<Func<TSqlAggregate, TEvent, TSqlAggregate>> expression)
where TEvent : Event<TAggregate>
{
Clauses.Add(expression);
return this;
}

private static IEnumerable<string> ConvertDefaultPropertyValues() => Properties
.Select(x => ConvertDefaultValue(x.PropertyType));

private static string ConvertDefaultValue(Type type)
{
if (ConstructorTypeToSqlDefaultValue.TryGetValue(type, out var result))
return result?.ToString() ?? "null";

throw new NotSupportedException($"Type {type} is not supported");
}

private static string ConvertClause(LambdaExpression expression) =>
$"WHEN {EventToken}.\"{nameof(Event.Type)}\" = '{expression.Parameters.Last().Type.Name}' THEN {new SqlAggregateExpressionConverter().Convert(expression)}";

private static readonly Dictionary<Type, object?> ConstructorTypeToSqlDefaultValue = new()
{
// Numeric types
{ typeof(byte), default(byte) },
{ typeof(short), default(short) },
{ typeof(int), default(int) },
{ typeof(long), default(long) },
{ typeof(float), default(float) },
{ typeof(double), default(double) },
{ typeof(decimal), default(decimal) },
{ typeof(BigInteger), default(BigInteger) },

// Text types
{ typeof(string), default(string) },
{ typeof(char[]), default(char[]) },
{ typeof(char), default(char[]) },
{ typeof(ArraySegment<char>), default(ArraySegment<char>) },
{ typeof(JsonDocument), default(JsonDocument) },

// Date/time types
// The DateTime entry is for LegacyTimestampBehavior mode only. In regular mode we resolve through
// ResolveValueDependentValue below
{ typeof(DateTime), default(DateTime) },
{ typeof(DateTimeOffset), default(DateTimeOffset) },
{ typeof(DateOnly), default(DateOnly) },
{ typeof(TimeOnly), default(TimeOnly) },
{ typeof(TimeSpan), default(TimeSpan) },
{ typeof(NpgsqlInterval), default(NpgsqlInterval) },

// Network types
{ typeof(IPAddress), default(IPAddress) },
// See ReadOnlyIPAddress below
{ typeof((IPAddress Address, int Subnet)), default((IPAddress, int)) },
#pragma warning disable 618
{ typeof(NpgsqlInet), default(NpgsqlInet) },
#pragma warning restore 618
{ typeof(PhysicalAddress), default(PhysicalAddress) },

// Full-text types
{ typeof(NpgsqlTsVector), default(NpgsqlTsVector) },
{ typeof(NpgsqlTsQueryLexeme), default(NpgsqlTsQueryLexeme) },
{ typeof(NpgsqlTsQueryAnd), default(NpgsqlTsQueryAnd) },
{ typeof(NpgsqlTsQueryOr), default(NpgsqlTsQueryOr) },
{ typeof(NpgsqlTsQueryNot), default(NpgsqlTsQueryNot) },
{ typeof(NpgsqlTsQueryEmpty), default(NpgsqlTsQueryEmpty) },
{ typeof(NpgsqlTsQueryFollowedBy), default(NpgsqlTsQueryFollowedBy) },

// Geometry types
{ typeof(NpgsqlBox), default(NpgsqlBox) },
{ typeof(NpgsqlCircle), default(NpgsqlCircle) },
{ typeof(NpgsqlLine), default(NpgsqlLine) },
{ typeof(NpgsqlLSeg), default(NpgsqlLSeg) },
{ typeof(NpgsqlPath), default(NpgsqlPath) },
{ typeof(NpgsqlPoint), default(NpgsqlPoint) },
{ typeof(NpgsqlPolygon), default(NpgsqlPolygon) },

// Misc types
{ typeof(bool), default(bool) },
{ typeof(byte[]), default(byte[]) },
{ typeof(ArraySegment<byte>), default(ArraySegment<byte>) },
{ typeof(Guid), default(Guid) },
{ typeof(BitArray), default(BitArray) },
{ typeof(BitVector32), default(BitVector32) },
{ typeof(Dictionary<string, string>), default(Dictionary<string, string>) },

// Internal types
{ typeof(NpgsqlLogSequenceNumber), default(NpgsqlLogSequenceNumber) },
{ typeof(NpgsqlTid), default(NpgsqlTid) },
{ typeof(DBNull), default(DBNull) },

// Built-in range types
{ typeof(NpgsqlRange<int>), default(NpgsqlRange<int>) },
{ typeof(NpgsqlRange<long>), default(NpgsqlRange<long>) },
{ typeof(NpgsqlRange<decimal>), default(NpgsqlRange<decimal>) },
{ typeof(NpgsqlRange<DateOnly>), default(NpgsqlRange<DateOnly>) },

// Built-in multirange types
{ typeof(NpgsqlRange<int>[]), default(NpgsqlRange<int>[]) },
{ typeof(List<NpgsqlRange<int>>), default(List<NpgsqlRange<int>>) },
{ typeof(NpgsqlRange<long>[]), default(NpgsqlRange<long>[]) },
{ typeof(List<NpgsqlRange<long>>), default(List<NpgsqlRange<long>>) },
{ typeof(NpgsqlRange<decimal>[]), default(NpgsqlRange<decimal>[]) },
{ typeof(List<NpgsqlRange<decimal>>), default(List<NpgsqlRange<decimal>>) },
{ typeof(NpgsqlRange<DateOnly>[]), default(NpgsqlRange<DateOnly>[]) },
{ typeof(List<NpgsqlRange<DateOnly>>), default(List<NpgsqlRange<DateOnly>>) }
};
}
Loading