From b07b967714095697341c46026cb54e92f7f1ad3f Mon Sep 17 00:00:00 2001 From: nomba Date: Sun, 27 Aug 2023 16:24:42 +0300 Subject: [PATCH] Accounting logic, TaskCreatedV2 start --- src/Accounting/AccountingDbContext.cs | 2 + src/Accounting/AccountingDbContextSeeder.cs | 1 + src/Accounting/Domain/Account.cs | 41 ++++++++++++++++++ .../Events/TransactionCommittedDomainEvent.cs | 13 ++++++ src/Accounting/Domain/Task.cs | 9 ++++ src/Accounting/Domain/Transaction.cs | 15 +++++++ .../MediatrBasedMessageBrokerEventConsumer.cs | 13 ++++++ .../PopugCreatedStreamingEvent.cs | 25 +++++++++++ .../PopugCreatedStreamingEventConsumer.cs | 23 ++++++++++ .../TaskCompletedBehaviourEvent.cs | 2 +- .../TaskCompletedBehaviourEventConsumer.cs | 40 +++++++++++++++++ .../TaskCreatedBehaviourEvent.cs | 7 +-- .../TaskCreatedBehaviourEventConsumer.cs | 41 ++++++++++++++++++ .../TaskCreatedStreamingEvent.cs | 22 ++++++++++ .../TaskCreatedStreamingEventV2.cs | 23 ++++++++++ .../TaskReassignedBehaviourEvent.cs | 3 +- .../TaskReassignedBehaviourEventConsumer.cs | 40 +++++++++++++++++ .../TransactionCommittedBehaviourEvent.cs | 19 ++++++++ ...nsactionCommittedBehaviourEventProducer.cs | 24 +++++++++++ .../Integration/IKafkaMessageProducer.cs | 35 +++++++++++++++ .../Integration/KafkaMessageProducer.cs | 43 +++++++++++++++++++ .../MessageBrokerDomainEventHandler.cs | 33 ++++++++++++++ src/Accounting/MessageBrokerEvent.cs | 41 ++++++++++++++++++ src/Accounting/MessageBrokerEventName.cs | 13 ++++++ src/Accounting/MessageBrokerEventType.cs | 7 +++ .../PopugCreatedStreamingEventProducer.cs | 2 +- src/TaskTracker/Domain/Popug.cs | 2 +- .../Endpoints/CreateTaskEndpoint.cs | 20 --------- .../MediatrBasedMessageBrokerEventConsumer.cs | 13 ++++++ .../PopugCreatedStreamingEvent.cs | 25 +++++++++++ .../PopugCreatedStreamingEventConsumer.cs | 23 ++++++++++ .../RoleChangedBehaviourEvent.cs | 20 +++++++++ .../RoleChangedBehaviourEventConsumer.cs | 29 +++++++++++++ .../TaskCompletedBehaviourEvent.cs | 17 ++++++++ .../TaskCompletedBehaviourEventProducer.cs | 21 +++++++++ .../TaskCreatedBehaviourEvent.cs | 17 ++++++++ .../TaskCreatedBehaviourEventProducer.cs | 8 +--- .../TaskCreatedStreamingEvent.cs | 22 ++++++++++ .../TaskCreatedStreamingEventProducer.cs | 26 +++++++++++ .../TaskCreatedStreamingEventV2.cs | 23 ++++++++++ .../TaskReassignedBehaviourEvent.cs | 17 ++++++++ .../TaskReassignedBehaviourEventProducer.cs | 23 ++++++++++ .../TaskUpdatedStreamingEvent.cs | 2 +- .../Integration/IKafkaMessageProducer.cs | 29 +++++++++++++ .../Integration/KafkaMessageHandler.cs | 25 ++++++----- .../Integration/KafkaMessageListener.cs | 9 ++-- .../Integration/MessageBrokerEvent.cs | 8 ++-- .../ResilientKafkaMessageHandler.cs | 28 ++++++++++++ 48 files changed, 885 insertions(+), 59 deletions(-) create mode 100644 src/Accounting/Domain/Account.cs create mode 100644 src/Accounting/Domain/Events/TransactionCommittedDomainEvent.cs create mode 100644 src/Accounting/Domain/Task.cs create mode 100644 src/Accounting/Domain/Transaction.cs create mode 100644 src/Accounting/Integration/EventConsuming/MediatrBasedMessageBrokerEventConsumer.cs create mode 100644 src/Accounting/Integration/EventConsuming/PopugCreatedStreamingEvent.cs create mode 100644 src/Accounting/Integration/EventConsuming/PopugCreatedStreamingEventConsumer.cs rename src/{TaskTracker/Integration/Events => Accounting/Integration/EventConsuming}/TaskCompletedBehaviourEvent.cs (91%) create mode 100644 src/Accounting/Integration/EventConsuming/TaskCompletedBehaviourEventConsumer.cs rename src/{TaskTracker/Integration/Events => Accounting/Integration/EventConsuming}/TaskCreatedBehaviourEvent.cs (66%) create mode 100644 src/Accounting/Integration/EventConsuming/TaskCreatedBehaviourEventConsumer.cs create mode 100644 src/Accounting/Integration/EventConsuming/TaskCreatedStreamingEvent.cs create mode 100644 src/Accounting/Integration/EventConsuming/TaskCreatedStreamingEventV2.cs rename src/{TaskTracker/Integration/Events => Accounting/Integration/EventConsuming}/TaskReassignedBehaviourEvent.cs (84%) create mode 100644 src/Accounting/Integration/EventConsuming/TaskReassignedBehaviourEventConsumer.cs create mode 100644 src/Accounting/Integration/EventProducing/TransactionCommittedBehaviourEvent.cs create mode 100644 src/Accounting/Integration/EventProducing/TransactionCommittedBehaviourEventProducer.cs create mode 100644 src/Accounting/Integration/IKafkaMessageProducer.cs create mode 100644 src/Accounting/Integration/KafkaMessageProducer.cs create mode 100644 src/Accounting/Integration/MessageBrokerDomainEventHandler.cs create mode 100644 src/Accounting/MessageBrokerEvent.cs create mode 100644 src/Accounting/MessageBrokerEventName.cs create mode 100644 src/Accounting/MessageBrokerEventType.cs create mode 100644 src/TaskTracker/Integration/EventConsuming/MediatrBasedMessageBrokerEventConsumer.cs create mode 100644 src/TaskTracker/Integration/EventConsuming/PopugCreatedStreamingEvent.cs create mode 100644 src/TaskTracker/Integration/EventConsuming/PopugCreatedStreamingEventConsumer.cs create mode 100644 src/TaskTracker/Integration/EventConsuming/RoleChangedBehaviourEvent.cs create mode 100644 src/TaskTracker/Integration/EventConsuming/RoleChangedBehaviourEventConsumer.cs create mode 100644 src/TaskTracker/Integration/EventProducing/TaskCompletedBehaviourEvent.cs create mode 100644 src/TaskTracker/Integration/EventProducing/TaskCompletedBehaviourEventProducer.cs create mode 100644 src/TaskTracker/Integration/EventProducing/TaskCreatedBehaviourEvent.cs rename src/TaskTracker/Integration/{Events => EventProducing}/TaskCreatedBehaviourEventProducer.cs (68%) create mode 100644 src/TaskTracker/Integration/EventProducing/TaskCreatedStreamingEvent.cs create mode 100644 src/TaskTracker/Integration/EventProducing/TaskCreatedStreamingEventProducer.cs create mode 100644 src/TaskTracker/Integration/EventProducing/TaskCreatedStreamingEventV2.cs create mode 100644 src/TaskTracker/Integration/EventProducing/TaskReassignedBehaviourEvent.cs create mode 100644 src/TaskTracker/Integration/EventProducing/TaskReassignedBehaviourEventProducer.cs rename src/TaskTracker/Integration/{Events => EventProducing}/TaskUpdatedStreamingEvent.cs (93%) create mode 100644 src/TaskTracker/Integration/ResilientKafkaMessageHandler.cs diff --git a/src/Accounting/AccountingDbContext.cs b/src/Accounting/AccountingDbContext.cs index e3e29a6..4cab725 100644 --- a/src/Accounting/AccountingDbContext.cs +++ b/src/Accounting/AccountingDbContext.cs @@ -16,6 +16,8 @@ public AccountingDbContext(DbContextOptions options, IPubli _logger = logger; } + public DbSet Accounts { get; set; } + public DbSet Tasks { get; set; } public DbSet Pogugs { get; set; } protected override void OnModelCreating(ModelBuilder modelBuilder) diff --git a/src/Accounting/AccountingDbContextSeeder.cs b/src/Accounting/AccountingDbContextSeeder.cs index 4c5379b..d611342 100644 --- a/src/Accounting/AccountingDbContextSeeder.cs +++ b/src/Accounting/AccountingDbContextSeeder.cs @@ -1,5 +1,6 @@ using Accounting.Domain; using Microsoft.Extensions.Options; +using Task = System.Threading.Tasks.Task; namespace Accounting; diff --git a/src/Accounting/Domain/Account.cs b/src/Accounting/Domain/Account.cs new file mode 100644 index 0000000..5609df4 --- /dev/null +++ b/src/Accounting/Domain/Account.cs @@ -0,0 +1,41 @@ +using Accounting.Domain.Events; + +namespace Accounting.Domain; + +public class Account : Entity +{ + private readonly List _transactions = new(); + + public Account(Popug owner) + { + Owner = owner; + } + + public Popug Owner { get; } + public double Balance { get; private set; } + public IReadOnlyCollection Transactions => _transactions.AsReadOnly(); + + public void Deposit(double amount, string description) + { + if (amount <= 0) + throw new ArgumentException(nameof(amount)); + + var deposit = new Transaction(description, 0, amount); + _transactions.Add(deposit); + + Balance += amount; + _domainEvents.Add(new TransactionCommittedDomainEvent(this, deposit)); + } + + public void Withdraw(double amount, string description) + { + if (amount <= 0) + throw new ArgumentException(nameof(amount)); + + var withdrawal = new Transaction(description, amount, 0); + _transactions.Add(withdrawal); + + Balance -= amount; + _domainEvents.Add(new TransactionCommittedDomainEvent(this, withdrawal)); + } +} \ No newline at end of file diff --git a/src/Accounting/Domain/Events/TransactionCommittedDomainEvent.cs b/src/Accounting/Domain/Events/TransactionCommittedDomainEvent.cs new file mode 100644 index 0000000..3e336d4 --- /dev/null +++ b/src/Accounting/Domain/Events/TransactionCommittedDomainEvent.cs @@ -0,0 +1,13 @@ +namespace Accounting.Domain.Events; + +internal class TransactionCommittedDomainEvent : DomainEvent +{ + public TransactionCommittedDomainEvent(Account account, Transaction transaction) + { + Account = account; + Transaction = transaction; + } + + public Account Account { get; } + public Transaction Transaction { get; } +} \ No newline at end of file diff --git a/src/Accounting/Domain/Task.cs b/src/Accounting/Domain/Task.cs new file mode 100644 index 0000000..ead8ca9 --- /dev/null +++ b/src/Accounting/Domain/Task.cs @@ -0,0 +1,9 @@ +namespace Accounting.Domain; + +public class Task : Entity +{ + public string Title { get; set; } + public string Description { get; set; } + public double Fee { get; set; } + public double Reward { get; set; } +} \ No newline at end of file diff --git a/src/Accounting/Domain/Transaction.cs b/src/Accounting/Domain/Transaction.cs new file mode 100644 index 0000000..4284dcf --- /dev/null +++ b/src/Accounting/Domain/Transaction.cs @@ -0,0 +1,15 @@ +namespace Accounting.Domain; + +public class Transaction : Entity +{ + public Transaction(string description, double credit, double debit) + { + Description = description; + Credit = credit; + Debit = debit; + } + + public string Description { get; } + public double Credit { get; } + public double Debit { get; } +} \ No newline at end of file diff --git a/src/Accounting/Integration/EventConsuming/MediatrBasedMessageBrokerEventConsumer.cs b/src/Accounting/Integration/EventConsuming/MediatrBasedMessageBrokerEventConsumer.cs new file mode 100644 index 0000000..5b1026e --- /dev/null +++ b/src/Accounting/Integration/EventConsuming/MediatrBasedMessageBrokerEventConsumer.cs @@ -0,0 +1,13 @@ +using MediatR; + +namespace Accounting.Integration.EventConsuming; + +internal abstract class MediatrBasedMessageBrokerEventConsumer : INotificationHandler where TMessageBrokerEvent : MessageBrokerEvent +{ + protected abstract Task Consume(TMessageBrokerEvent @event, CancellationToken cancellationToken); + + public async Task Handle(TMessageBrokerEvent notification, CancellationToken cancellationToken) + { + await Consume(notification, cancellationToken); + } +} \ No newline at end of file diff --git a/src/Accounting/Integration/EventConsuming/PopugCreatedStreamingEvent.cs b/src/Accounting/Integration/EventConsuming/PopugCreatedStreamingEvent.cs new file mode 100644 index 0000000..d3861c8 --- /dev/null +++ b/src/Accounting/Integration/EventConsuming/PopugCreatedStreamingEvent.cs @@ -0,0 +1,25 @@ +using Accounting.Domain; + +namespace Accounting.Integration.EventConsuming; + +public class PopugCreatedStreamingEvent : MessageBrokerEvent +{ + private static readonly MessageBrokerEventName EventName = new("Users", "Created"); + private static readonly MessageBrokerEventType EventType = MessageBrokerEventType.Streaming; + + public PopugCreatedStreamingEvent(Guid id, DateTime createdAt, DataType data) : base(id, EventName, EventType, 1, createdAt, data) + { + } + + // public PopugCreatedStreamingEvent(DataType data, Guid id, MessageBrokerEventName name, int version, DateTime createdAt) : base(data, id, name, version, createdAt) + // { + // } + + public class DataType + { + public Guid Id { get; set; } + public string Username { get; set; } + public string FullName { get; set; } + public RoleType Role { get; set; } + } +} \ No newline at end of file diff --git a/src/Accounting/Integration/EventConsuming/PopugCreatedStreamingEventConsumer.cs b/src/Accounting/Integration/EventConsuming/PopugCreatedStreamingEventConsumer.cs new file mode 100644 index 0000000..399bb7b --- /dev/null +++ b/src/Accounting/Integration/EventConsuming/PopugCreatedStreamingEventConsumer.cs @@ -0,0 +1,23 @@ +using Accounting.Domain; +using Task = System.Threading.Tasks.Task; + +namespace Accounting.Integration.EventConsuming; + +internal class PopugCreatedStreamingEventConsumer : MediatrBasedMessageBrokerEventConsumer +{ + private readonly AccountingDbContext _dbContext; + + public PopugCreatedStreamingEventConsumer(AccountingDbContext dbContext) + { + _dbContext = dbContext; + } + + protected override async Task Consume(PopugCreatedStreamingEvent @event, CancellationToken cancellationToken) + { + if (@event.Data is not PopugCreatedStreamingEvent.DataType eventData) + throw new InvalidCastException(); + + _dbContext.Pogugs.Add(new Popug(eventData.Username, eventData.FullName, eventData.Role)); + await _dbContext.SaveChangesAsync(cancellationToken); + } +} \ No newline at end of file diff --git a/src/TaskTracker/Integration/Events/TaskCompletedBehaviourEvent.cs b/src/Accounting/Integration/EventConsuming/TaskCompletedBehaviourEvent.cs similarity index 91% rename from src/TaskTracker/Integration/Events/TaskCompletedBehaviourEvent.cs rename to src/Accounting/Integration/EventConsuming/TaskCompletedBehaviourEvent.cs index 6a6e221..e251ac7 100644 --- a/src/TaskTracker/Integration/Events/TaskCompletedBehaviourEvent.cs +++ b/src/Accounting/Integration/EventConsuming/TaskCompletedBehaviourEvent.cs @@ -1,4 +1,4 @@ -namespace TaskTracker.Integration.Events; +namespace Accounting.Integration.EventConsuming; public class TaskCompletedBehaviourEvent : MessageBrokerEvent { diff --git a/src/Accounting/Integration/EventConsuming/TaskCompletedBehaviourEventConsumer.cs b/src/Accounting/Integration/EventConsuming/TaskCompletedBehaviourEventConsumer.cs new file mode 100644 index 0000000..f38eba4 --- /dev/null +++ b/src/Accounting/Integration/EventConsuming/TaskCompletedBehaviourEventConsumer.cs @@ -0,0 +1,40 @@ +using Accounting.Domain; +using Microsoft.EntityFrameworkCore; +using Task = System.Threading.Tasks.Task; + +namespace Accounting.Integration.EventConsuming; + +internal class TaskCompletedBehaviourEventConsumer : MediatrBasedMessageBrokerEventConsumer +{ + private readonly AccountingDbContext _dbContext; + + public TaskCompletedBehaviourEventConsumer(AccountingDbContext dbContext) + { + _dbContext = dbContext; + } + + protected override async Task Consume(TaskCompletedBehaviourEvent @event, CancellationToken cancellationToken) + { + if (@event.Data is not TaskCompletedBehaviourEvent.DataType eventData) + throw new InvalidCastException(); + + // Find account for popug + + var account = await _dbContext.Accounts.FirstOrDefaultAsync(a => a.Owner.Id == eventData.AssigneeId, cancellationToken); + + if (account is null) + throw new InvalidOperationException("Account not found"); + + // Get task price + + var task = await _dbContext.Tasks.FirstOrDefaultAsync(t => t.Id == eventData.Id, cancellationToken); + + if (task is null) + throw new InvalidOperationException("Task not found"); + + // Give popug reward + + account.Deposit(task.Reward, $"Reward for {eventData.Id}"); + await _dbContext.SaveChangesAsync(cancellationToken); + } +} \ No newline at end of file diff --git a/src/TaskTracker/Integration/Events/TaskCreatedBehaviourEvent.cs b/src/Accounting/Integration/EventConsuming/TaskCreatedBehaviourEvent.cs similarity index 66% rename from src/TaskTracker/Integration/Events/TaskCreatedBehaviourEvent.cs rename to src/Accounting/Integration/EventConsuming/TaskCreatedBehaviourEvent.cs index 075cd08..68eda0a 100644 --- a/src/TaskTracker/Integration/Events/TaskCreatedBehaviourEvent.cs +++ b/src/Accounting/Integration/EventConsuming/TaskCreatedBehaviourEvent.cs @@ -1,4 +1,4 @@ -namespace TaskTracker.Integration.Events; +namespace Accounting.Integration.EventConsuming; public class TaskCreatedBehaviourEvent : MessageBrokerEvent { @@ -12,11 +12,6 @@ public TaskCreatedBehaviourEvent(Guid id, DateTime createdAt, DataType data) : b public class DataType { public Guid Id { get; set; } - public string Title { get; set; } - public string Description { get; set; } - public double Fee { get; set; } - public double Reward { get; set; } - public string Status { get; set; } public Guid AssigneeId { get; set; } } } \ No newline at end of file diff --git a/src/Accounting/Integration/EventConsuming/TaskCreatedBehaviourEventConsumer.cs b/src/Accounting/Integration/EventConsuming/TaskCreatedBehaviourEventConsumer.cs new file mode 100644 index 0000000..8addc94 --- /dev/null +++ b/src/Accounting/Integration/EventConsuming/TaskCreatedBehaviourEventConsumer.cs @@ -0,0 +1,41 @@ +using Accounting.Domain; +using Microsoft.EntityFrameworkCore; +using Task = System.Threading.Tasks.Task; + +namespace Accounting.Integration.EventConsuming; + +internal class TaskCreatedBehaviourEventConsumer : MediatrBasedMessageBrokerEventConsumer +{ + private readonly AccountingDbContext _dbContext; + + public TaskCreatedBehaviourEventConsumer(AccountingDbContext dbContext) + { + _dbContext = dbContext; + } + + // TODO: Introduce Commands layer because withdrawal is duplicated in TaskReassignedHandler + protected override async Task Consume(TaskCreatedBehaviourEvent @event, CancellationToken cancellationToken) + { + if (@event.Data is not TaskCreatedBehaviourEvent.DataType eventData) + throw new InvalidCastException(); + + // Find account for popug + + var account = await _dbContext.Accounts.FirstOrDefaultAsync(a => a.Owner.Id == eventData.AssigneeId, cancellationToken); + + if (account is null) + throw new InvalidOperationException("Account not found"); + + // Get task price + + var task = await _dbContext.Tasks.FirstOrDefaultAsync(t => t.Id == eventData.Id, cancellationToken); + + if (task is null) + throw new InvalidOperationException("Task not found"); + + // Take fee from popug + + account.Withdraw(task.Fee, $"Fee for {eventData.Id} assigned"); + await _dbContext.SaveChangesAsync(cancellationToken); + } +} \ No newline at end of file diff --git a/src/Accounting/Integration/EventConsuming/TaskCreatedStreamingEvent.cs b/src/Accounting/Integration/EventConsuming/TaskCreatedStreamingEvent.cs new file mode 100644 index 0000000..76f5b7c --- /dev/null +++ b/src/Accounting/Integration/EventConsuming/TaskCreatedStreamingEvent.cs @@ -0,0 +1,22 @@ +namespace Accounting.Integration.EventConsuming; + +public class TaskCreatedStreamingEvent : MessageBrokerEvent +{ + private static readonly MessageBrokerEventName EventName = new("Tasks", "Created"); + private static readonly MessageBrokerEventType EventType = MessageBrokerEventType.Streaming; + + public TaskCreatedStreamingEvent(Guid id, DateTime createdAt, DataType data) : base(id, EventName, EventType, 2, createdAt, data) + { + } + + public class DataType + { + public Guid Id { get; set; } + public string Title { get; set; } + public string Description { get; set; } + public double Fee { get; set; } + public double Reward { get; set; } + public TaskStatus Status { get; set; } + public Guid AssigneeId { get; set; } + } +} \ No newline at end of file diff --git a/src/Accounting/Integration/EventConsuming/TaskCreatedStreamingEventV2.cs b/src/Accounting/Integration/EventConsuming/TaskCreatedStreamingEventV2.cs new file mode 100644 index 0000000..e02b5aa --- /dev/null +++ b/src/Accounting/Integration/EventConsuming/TaskCreatedStreamingEventV2.cs @@ -0,0 +1,23 @@ +namespace Accounting.Integration.EventConsuming; + +public class TaskCreatedStreamingEventV2 : MessageBrokerEvent +{ + private static readonly MessageBrokerEventName EventName = new("Tasks", "Created"); + private static readonly MessageBrokerEventType EventType = MessageBrokerEventType.Streaming; + + public TaskCreatedStreamingEventV2(Guid id, DateTime createdAt, DataType data) : base(id, EventName, EventType, 2, createdAt, data) + { + } + + public class DataType + { + public Guid Id { get; set; } + public string JiraId { get; set; } + public string Title { get; set; } + public string Description { get; set; } + public double Fee { get; set; } + public double Reward { get; set; } + public string Status { get; set; } + public Guid AssigneeId { get; set; } + } +} \ No newline at end of file diff --git a/src/TaskTracker/Integration/Events/TaskReassignedBehaviourEvent.cs b/src/Accounting/Integration/EventConsuming/TaskReassignedBehaviourEvent.cs similarity index 84% rename from src/TaskTracker/Integration/Events/TaskReassignedBehaviourEvent.cs rename to src/Accounting/Integration/EventConsuming/TaskReassignedBehaviourEvent.cs index ba13f05..f14b09c 100644 --- a/src/TaskTracker/Integration/Events/TaskReassignedBehaviourEvent.cs +++ b/src/Accounting/Integration/EventConsuming/TaskReassignedBehaviourEvent.cs @@ -1,4 +1,4 @@ -namespace TaskTracker.Integration.Events; +namespace Accounting.Integration.EventConsuming; public class TaskReassignedBehaviourEvent : MessageBrokerEvent { @@ -13,6 +13,5 @@ public class DataType { public Guid Id { get; set; } public Guid NewAssigneeId { get; set; } - public Guid PreviousAssigneeId { get; set; } } } \ No newline at end of file diff --git a/src/Accounting/Integration/EventConsuming/TaskReassignedBehaviourEventConsumer.cs b/src/Accounting/Integration/EventConsuming/TaskReassignedBehaviourEventConsumer.cs new file mode 100644 index 0000000..a168e7b --- /dev/null +++ b/src/Accounting/Integration/EventConsuming/TaskReassignedBehaviourEventConsumer.cs @@ -0,0 +1,40 @@ +using Accounting.Domain; +using Microsoft.EntityFrameworkCore; +using Task = System.Threading.Tasks.Task; + +namespace Accounting.Integration.EventConsuming; + +internal class TaskReassignedBehaviourEventConsumer : MediatrBasedMessageBrokerEventConsumer +{ + private readonly AccountingDbContext _dbContext; + + public TaskReassignedBehaviourEventConsumer(AccountingDbContext dbContext) + { + _dbContext = dbContext; + } + + protected override async Task Consume(TaskReassignedBehaviourEvent @event, CancellationToken cancellationToken) + { + if (@event.Data is not TaskReassignedBehaviourEvent.DataType eventData) + throw new InvalidCastException(); + + // Find account for popug + + var account = await _dbContext.Accounts.FirstOrDefaultAsync(a => a.Owner.Id == eventData.NewAssigneeId, cancellationToken); + + if (account is null) + throw new InvalidOperationException("Account not found"); + + // Get task price + + var task = await _dbContext.Tasks.FirstOrDefaultAsync(t => t.Id == eventData.Id, cancellationToken); + + if (task is null) + throw new InvalidOperationException("Task not found"); + + // Take fee from popug + + account.Withdraw(task.Fee, $"Fee for {eventData.Id} assigned"); + await _dbContext.SaveChangesAsync(cancellationToken); + } +} \ No newline at end of file diff --git a/src/Accounting/Integration/EventProducing/TransactionCommittedBehaviourEvent.cs b/src/Accounting/Integration/EventProducing/TransactionCommittedBehaviourEvent.cs new file mode 100644 index 0000000..6c9954c --- /dev/null +++ b/src/Accounting/Integration/EventProducing/TransactionCommittedBehaviourEvent.cs @@ -0,0 +1,19 @@ +namespace Accounting.Integration.EventProducing; + +public class TransactionCommittedBehaviourEvent : MessageBrokerEvent +{ + private static readonly MessageBrokerEventName EventName = new("Accounts", "TransactionCommitted"); + private static readonly MessageBrokerEventType EventType = MessageBrokerEventType.Behaviour; + + public TransactionCommittedBehaviourEvent(Guid id, DateTime createdAt, DataType data) : base(id, EventName, EventType, 1, createdAt, data) + { + } + + public class DataType + { + public Guid Id { get; set; } + public string Description { get; set; } + public double Credit { get; set; } + public double Debit { get; set; } + } +} \ No newline at end of file diff --git a/src/Accounting/Integration/EventProducing/TransactionCommittedBehaviourEventProducer.cs b/src/Accounting/Integration/EventProducing/TransactionCommittedBehaviourEventProducer.cs new file mode 100644 index 0000000..da242f9 --- /dev/null +++ b/src/Accounting/Integration/EventProducing/TransactionCommittedBehaviourEventProducer.cs @@ -0,0 +1,24 @@ +using Accounting.Domain.Events; +using Accounting.Integration.EventConsuming; + +namespace Accounting.Integration.EventProducing; + +internal class TransactionCommittedBehaviourEventProducer : MessageBrokerDomainEventHandler +{ + public TransactionCommittedBehaviourEventProducer(IKafkaMessageProducer kafkaMessageProducer) : base(kafkaMessageProducer) + { + } + + protected override TransactionCommittedBehaviourEvent MapToMessageBrokerEvent(TransactionCommittedDomainEvent domainEvent) + { + return new TransactionCommittedBehaviourEvent(Guid.NewGuid(), DateTime.UtcNow, new TransactionCommittedBehaviourEvent.DataType + { + Id = domainEvent.Transaction.Id, + Description = domainEvent.Transaction.Description, + Credit = domainEvent.Transaction.Credit, + Debit = domainEvent.Transaction.Debit + }); + } + + protected override string GetTopic() => "account-life-cycle"; +} \ No newline at end of file diff --git a/src/Accounting/Integration/IKafkaMessageProducer.cs b/src/Accounting/Integration/IKafkaMessageProducer.cs new file mode 100644 index 0000000..e647d68 --- /dev/null +++ b/src/Accounting/Integration/IKafkaMessageProducer.cs @@ -0,0 +1,35 @@ +namespace Accounting.Integration; + +internal interface IKafkaMessageProducer : IDisposable +{ + Task Produce(string topic, string message, CancellationToken cancellationToken); +} + +internal class ResistantKafkaMessageProducer : IKafkaMessageProducer +{ + private readonly IKafkaMessageProducer _kafkaMessageProducer; + private readonly ILogger _logger; + + public ResistantKafkaMessageProducer(IKafkaMessageProducer kafkaMessageProducer, ILogger logger) + { + _kafkaMessageProducer = kafkaMessageProducer; + _logger = logger; + } + + public void Dispose() + { + _kafkaMessageProducer.Dispose(); + } + + public async Task Produce(string topic, string message, CancellationToken cancellationToken) + { + // TODO: Use Poly for retry policy + + var messageHash = message.GetHashCode(); + _logger.LogInformation("Sending {Message} to {Topic} [{MessageHash}]..", topic, message, messageHash); + + await _kafkaMessageProducer.Produce(topic, message, cancellationToken); + + _logger.LogInformation("Message [{MessageHash}] to {Topic} sent", messageHash, topic); + } +} \ No newline at end of file diff --git a/src/Accounting/Integration/KafkaMessageProducer.cs b/src/Accounting/Integration/KafkaMessageProducer.cs new file mode 100644 index 0000000..d113b3d --- /dev/null +++ b/src/Accounting/Integration/KafkaMessageProducer.cs @@ -0,0 +1,43 @@ +using Accounting.Integration; +using Confluent.Kafka; + +namespace TaskTracker.Integration; + +internal class KafkaMessageProducer : IKafkaMessageProducer +{ + private readonly ILogger _logger; + private readonly IProducer _producer; + + public KafkaMessageProducer(ILogger logger) + { + _logger = logger; + + var config = new ProducerConfig + { + BootstrapServers = "localhost:9092", + }; + + _producer = new ProducerBuilder(config).Build(); + } + + public async Task Produce(string topic, string message, CancellationToken cancellationToken) + { + if (topic == null) throw new ArgumentNullException(nameof(topic)); + if (message == null) throw new ArgumentNullException(nameof(message)); + + var messageHash = message.GetHashCode(); + _logger.LogInformation("Sending {Message} to {Topic} [{MessageHash}]..", topic, message, messageHash); + + await _producer.ProduceAsync(topic, new Message + { + Value = message + }, cancellationToken); + + _logger.LogInformation("Message [{MessageHash}] to {Topic} sent", messageHash, topic); + } + + public void Dispose() + { + _producer.Dispose(); + } +} \ No newline at end of file diff --git a/src/Accounting/Integration/MessageBrokerDomainEventHandler.cs b/src/Accounting/Integration/MessageBrokerDomainEventHandler.cs new file mode 100644 index 0000000..eb60cc1 --- /dev/null +++ b/src/Accounting/Integration/MessageBrokerDomainEventHandler.cs @@ -0,0 +1,33 @@ +using System.Text.Json; +using MediatR; + +namespace Accounting.Integration; + +internal abstract class MessageBrokerDomainEventHandler : INotificationHandler + where TDomainEvent : DomainEvent + where TMessageBrokerEvent : MessageBrokerEvent +{ + private readonly IKafkaMessageProducer _kafkaMessageProducer; + + public MessageBrokerDomainEventHandler(IKafkaMessageProducer kafkaMessageProducer) + { + _kafkaMessageProducer = kafkaMessageProducer; + } + + public async Task Handle(TDomainEvent domainEvent, CancellationToken cancellationToken) + { + if (MapToMessageBrokerEvent(domainEvent) is not { } messageBrokerEvent) + throw new InvalidOperationException($"Unable to map {typeof(TDomainEvent)} to {typeof(TMessageBrokerEvent)}"); + + // Serialize to Json and validate with Schema Registry + var eventJson = JsonSerializer.Serialize(messageBrokerEvent); + + // TODO: Fix `Json.Schema.JsonSchemaException: Cannot resolve custom meta-schema. Make sure meta-schemas are registered in the global registry.` + // SchemaRegistry.Validator.ValidateEvent(eventJson, messageBrokerEvent.Name.Domain, messageBrokerEvent.Name.Value, messageBrokerEvent.Version); + + await _kafkaMessageProducer.Produce(GetTopic(), eventJson, cancellationToken); + } + + protected abstract TMessageBrokerEvent MapToMessageBrokerEvent(TDomainEvent domainEvent); + protected abstract string GetTopic(); +} \ No newline at end of file diff --git a/src/Accounting/MessageBrokerEvent.cs b/src/Accounting/MessageBrokerEvent.cs new file mode 100644 index 0000000..538a4c4 --- /dev/null +++ b/src/Accounting/MessageBrokerEvent.cs @@ -0,0 +1,41 @@ +using MediatR; + +namespace Accounting; + +public class MessageBrokerEvent : INotification +{ + public MessageBrokerEvent(T data, Guid id, MessageBrokerEventName name, int version, DateTime createdAt) + { + Data = data; + Id = id; + Name = name; + Version = version; + CreatedAt = createdAt; + } + + public Guid Id { get; } + public MessageBrokerEventName Name { get; } + public int Version { get; } + public DateTime CreatedAt { get; } + public T Data { get; } +} + +public class MessageBrokerEvent : INotification +{ + public MessageBrokerEvent(Guid id, MessageBrokerEventName name, MessageBrokerEventType type, int version, DateTime createdAt, object data) + { + Id = id; + Name = name; + Version = version; + Type = type; + CreatedAt = createdAt; + Data = data; + } + + public Guid Id { get; } + public MessageBrokerEventName Name { get; } + public MessageBrokerEventType Type { get; } + public int Version { get; } + public DateTime CreatedAt { get; } + public object Data { get; } +} \ No newline at end of file diff --git a/src/Accounting/MessageBrokerEventName.cs b/src/Accounting/MessageBrokerEventName.cs new file mode 100644 index 0000000..1d0d176 --- /dev/null +++ b/src/Accounting/MessageBrokerEventName.cs @@ -0,0 +1,13 @@ +namespace Accounting; + +public struct MessageBrokerEventName +{ + public MessageBrokerEventName(string domain, string value) + { + Domain = domain; + Value = value; + } + + public string Value { get; } + public string Domain { get; } +} \ No newline at end of file diff --git a/src/Accounting/MessageBrokerEventType.cs b/src/Accounting/MessageBrokerEventType.cs new file mode 100644 index 0000000..0cebe15 --- /dev/null +++ b/src/Accounting/MessageBrokerEventType.cs @@ -0,0 +1,7 @@ +namespace Accounting; + +public enum MessageBrokerEventType +{ + Streaming, + Behaviour +} \ No newline at end of file diff --git a/src/Auth/Integration/Events/PopugCreatedStreamingEventProducer.cs b/src/Auth/Integration/Events/PopugCreatedStreamingEventProducer.cs index f7e9a43..974b83f 100644 --- a/src/Auth/Integration/Events/PopugCreatedStreamingEventProducer.cs +++ b/src/Auth/Integration/Events/PopugCreatedStreamingEventProducer.cs @@ -19,5 +19,5 @@ protected override PopugCreatedStreamingEvent MapToMessageBrokerEvent(PopugCreat }); } - protected override string GetTopic() => "popug-user-streaming"; + protected override string GetTopic() => "user-streaming"; } \ No newline at end of file diff --git a/src/TaskTracker/Domain/Popug.cs b/src/TaskTracker/Domain/Popug.cs index 274ce8e..55a3b40 100644 --- a/src/TaskTracker/Domain/Popug.cs +++ b/src/TaskTracker/Domain/Popug.cs @@ -21,7 +21,7 @@ private Popug() public string Username { get; } public string FullName { get; set; } - public RoleType Role { get; private set; } + public RoleType Role { get; set; } public string? Email { get; init; } public bool IsActive { get; private set; } } \ No newline at end of file diff --git a/src/TaskTracker/Endpoints/CreateTaskEndpoint.cs b/src/TaskTracker/Endpoints/CreateTaskEndpoint.cs index 986f34f..2d8f6fc 100644 --- a/src/TaskTracker/Endpoints/CreateTaskEndpoint.cs +++ b/src/TaskTracker/Endpoints/CreateTaskEndpoint.cs @@ -43,24 +43,4 @@ public CreateTaskEndpoint(TaskTrackerDbContext taskTrackerDbContext) return Created($"/tasks/{task.Id}", response); } -} - -public interface ITaskAssigneeFinder -{ - Task Find(CancellationToken cancellationToken); -} - -class TaskAssigneeFinder : ITaskAssigneeFinder -{ - private readonly TaskTrackerDbContext _taskTrackerDbContext; - - public TaskAssigneeFinder(TaskTrackerDbContext taskTrackerDbContext) - { - _taskTrackerDbContext = taskTrackerDbContext; - } - - public Task Find(CancellationToken cancellationToken) - { - throw new NotImplementedException(); - } } \ No newline at end of file diff --git a/src/TaskTracker/Integration/EventConsuming/MediatrBasedMessageBrokerEventConsumer.cs b/src/TaskTracker/Integration/EventConsuming/MediatrBasedMessageBrokerEventConsumer.cs new file mode 100644 index 0000000..8545bfb --- /dev/null +++ b/src/TaskTracker/Integration/EventConsuming/MediatrBasedMessageBrokerEventConsumer.cs @@ -0,0 +1,13 @@ +using MediatR; + +namespace TaskTracker.Integration.EventConsuming; + +internal abstract class MediatrBasedMessageBrokerEventConsumer : INotificationHandler where TMessageBrokerEvent : MessageBrokerEvent +{ + protected abstract Task Consume(TMessageBrokerEvent @event, CancellationToken cancellationToken); + + public async Task Handle(TMessageBrokerEvent notification, CancellationToken cancellationToken) + { + await Consume(notification, cancellationToken); + } +} \ No newline at end of file diff --git a/src/TaskTracker/Integration/EventConsuming/PopugCreatedStreamingEvent.cs b/src/TaskTracker/Integration/EventConsuming/PopugCreatedStreamingEvent.cs new file mode 100644 index 0000000..370ed06 --- /dev/null +++ b/src/TaskTracker/Integration/EventConsuming/PopugCreatedStreamingEvent.cs @@ -0,0 +1,25 @@ +using TaskTracker.Domain; + +namespace TaskTracker.Integration.EventConsuming; + +public class PopugCreatedStreamingEvent : MessageBrokerEvent +{ + private static readonly MessageBrokerEventName EventName = new("Users", "Created"); + private static readonly MessageBrokerEventType EventType = MessageBrokerEventType.Streaming; + + public PopugCreatedStreamingEvent(Guid id, DateTime createdAt, DataType data) : base(id, EventName, EventType, 1, createdAt, data) + { + } + + // public PopugCreatedStreamingEvent(DataType data, Guid id, MessageBrokerEventName name, int version, DateTime createdAt) : base(data, id, name, version, createdAt) + // { + // } + + public class DataType + { + public Guid Id { get; set; } + public string Username { get; set; } + public string FullName { get; set; } + public RoleType Role { get; set; } + } +} \ No newline at end of file diff --git a/src/TaskTracker/Integration/EventConsuming/PopugCreatedStreamingEventConsumer.cs b/src/TaskTracker/Integration/EventConsuming/PopugCreatedStreamingEventConsumer.cs new file mode 100644 index 0000000..12bb166 --- /dev/null +++ b/src/TaskTracker/Integration/EventConsuming/PopugCreatedStreamingEventConsumer.cs @@ -0,0 +1,23 @@ +using TaskTracker.Domain; +using Task = System.Threading.Tasks.Task; + +namespace TaskTracker.Integration.EventConsuming; + +internal class PopugCreatedStreamingEventConsumer : MediatrBasedMessageBrokerEventConsumer +{ + private readonly TaskTrackerDbContext _dbContext; + + public PopugCreatedStreamingEventConsumer(TaskTrackerDbContext dbContext) + { + _dbContext = dbContext; + } + + protected override async Task Consume(PopugCreatedStreamingEvent @event, CancellationToken cancellationToken) + { + if (@event.Data is not PopugCreatedStreamingEvent.DataType eventData) + throw new InvalidCastException(); + + _dbContext.Popugs.Add(new Popug(eventData.Username, eventData.FullName, eventData.Role)); + await _dbContext.SaveChangesAsync(cancellationToken); + } +} \ No newline at end of file diff --git a/src/TaskTracker/Integration/EventConsuming/RoleChangedBehaviourEvent.cs b/src/TaskTracker/Integration/EventConsuming/RoleChangedBehaviourEvent.cs new file mode 100644 index 0000000..a3062af --- /dev/null +++ b/src/TaskTracker/Integration/EventConsuming/RoleChangedBehaviourEvent.cs @@ -0,0 +1,20 @@ +using TaskTracker.Domain; + +namespace TaskTracker.Integration.EventConsuming; + +public class RoleChangedBehaviourEvent : MessageBrokerEvent +{ + private static readonly MessageBrokerEventName EventName = new("Users", "RoleChanged"); + private static readonly MessageBrokerEventType EventType = MessageBrokerEventType.Behaviour; + + public RoleChangedBehaviourEvent(Guid id, DateTime createdAt, DataType data) : base(id, EventName, EventType, 1, createdAt, data) + { + } + + public class DataType + { + public Guid PopugId { get; } + public RoleType NewRole { get; } + public RoleType PreviousRole { get; } + } +} \ No newline at end of file diff --git a/src/TaskTracker/Integration/EventConsuming/RoleChangedBehaviourEventConsumer.cs b/src/TaskTracker/Integration/EventConsuming/RoleChangedBehaviourEventConsumer.cs new file mode 100644 index 0000000..73c0840 --- /dev/null +++ b/src/TaskTracker/Integration/EventConsuming/RoleChangedBehaviourEventConsumer.cs @@ -0,0 +1,29 @@ +using Microsoft.EntityFrameworkCore; + +namespace TaskTracker.Integration.EventConsuming; + +internal class RoleChangedBehaviourEventConsumer : MediatrBasedMessageBrokerEventConsumer +{ + private readonly TaskTrackerDbContext _dbContext; + + public RoleChangedBehaviourEventConsumer(TaskTrackerDbContext dbContext) + { + _dbContext = dbContext; + } + + protected override async Task Consume(RoleChangedBehaviourEvent @event, CancellationToken cancellationToken) + { + if (@event.Data is not RoleChangedBehaviourEvent.DataType eventData) + throw new InvalidCastException(); + + // TODO: Handle variant when popug role changed event received before popug created event + + var popug = await _dbContext.Popugs.FirstOrDefaultAsync(p => p.Id == eventData.PopugId, cancellationToken); + + if (popug is null) + throw new InvalidOperationException("Popug not found"); + + popug.Role = eventData.NewRole; + await _dbContext.SaveChangesAsync(cancellationToken); + } +} \ No newline at end of file diff --git a/src/TaskTracker/Integration/EventProducing/TaskCompletedBehaviourEvent.cs b/src/TaskTracker/Integration/EventProducing/TaskCompletedBehaviourEvent.cs new file mode 100644 index 0000000..07b8607 --- /dev/null +++ b/src/TaskTracker/Integration/EventProducing/TaskCompletedBehaviourEvent.cs @@ -0,0 +1,17 @@ +namespace TaskTracker.Integration.EventProducing; + +public class TaskCompletedBehaviourEvent : MessageBrokerEvent +{ + private static readonly MessageBrokerEventName EventName = new("Tasks", "Completed"); + private static readonly MessageBrokerEventType EventType = MessageBrokerEventType.Behaviour; + + public TaskCompletedBehaviourEvent(Guid id, DateTime createdAt, DataType data) : base(id, EventName, EventType, 1, createdAt, data) + { + } + + public class DataType + { + public Guid Id { get; set; } + public Guid AssigneeId { get; set; } + } +} \ No newline at end of file diff --git a/src/TaskTracker/Integration/EventProducing/TaskCompletedBehaviourEventProducer.cs b/src/TaskTracker/Integration/EventProducing/TaskCompletedBehaviourEventProducer.cs new file mode 100644 index 0000000..62506bf --- /dev/null +++ b/src/TaskTracker/Integration/EventProducing/TaskCompletedBehaviourEventProducer.cs @@ -0,0 +1,21 @@ +using TaskTracker.Domain.Events; + +namespace TaskTracker.Integration.EventProducing; + +internal class TaskCompletedBehaviourEventProducer : MessageBrokerDomainEventHandler +{ + public TaskCompletedBehaviourEventProducer(IKafkaMessageProducer kafkaMessageProducer) : base(kafkaMessageProducer) + { + } + + protected override TaskCompletedBehaviourEvent MapToMessageBrokerEvent(TaskCompletedDomainEvent domainEvent) + { + return new TaskCompletedBehaviourEvent(Guid.NewGuid(), DateTime.UtcNow, new TaskCompletedBehaviourEvent.DataType + { + Id = domainEvent.Task.Id, + AssigneeId = domainEvent.Task.Assignee.Id + }); + } + + protected override string GetTopic() => "task-life-cycle"; +} \ No newline at end of file diff --git a/src/TaskTracker/Integration/EventProducing/TaskCreatedBehaviourEvent.cs b/src/TaskTracker/Integration/EventProducing/TaskCreatedBehaviourEvent.cs new file mode 100644 index 0000000..26b298a --- /dev/null +++ b/src/TaskTracker/Integration/EventProducing/TaskCreatedBehaviourEvent.cs @@ -0,0 +1,17 @@ +namespace TaskTracker.Integration.EventProducing; + +public class TaskCreatedBehaviourEvent : MessageBrokerEvent +{ + private static readonly MessageBrokerEventName EventName = new("Tasks", "Created"); + private static readonly MessageBrokerEventType EventType = MessageBrokerEventType.Behaviour; + + public TaskCreatedBehaviourEvent(Guid id, DateTime createdAt, DataType data) : base(id, EventName, EventType, 1, createdAt, data) + { + } + + public class DataType + { + public Guid Id { get; set; } + public Guid AssigneeId { get; set; } + } +} \ No newline at end of file diff --git a/src/TaskTracker/Integration/Events/TaskCreatedBehaviourEventProducer.cs b/src/TaskTracker/Integration/EventProducing/TaskCreatedBehaviourEventProducer.cs similarity index 68% rename from src/TaskTracker/Integration/Events/TaskCreatedBehaviourEventProducer.cs rename to src/TaskTracker/Integration/EventProducing/TaskCreatedBehaviourEventProducer.cs index 4914d04..a881f36 100644 --- a/src/TaskTracker/Integration/Events/TaskCreatedBehaviourEventProducer.cs +++ b/src/TaskTracker/Integration/EventProducing/TaskCreatedBehaviourEventProducer.cs @@ -1,6 +1,6 @@ using TaskTracker.Domain.Events; -namespace TaskTracker.Integration.Events; +namespace TaskTracker.Integration.EventProducing; internal class TaskCreatedBehaviourEventProducer : MessageBrokerDomainEventHandler { @@ -13,11 +13,7 @@ protected override TaskCreatedBehaviourEvent MapToMessageBrokerEvent(TaskCreated return new TaskCreatedBehaviourEvent(Guid.NewGuid(), DateTime.UtcNow, new TaskCreatedBehaviourEvent.DataType { Id = domainEvent.Task.Id, - Title = domainEvent.Task.Title, - Description = domainEvent.Task.Description, - Fee = (double) domainEvent.Task.Price.Fee, - Reward = (double) domainEvent.Task.Price.Reward, - Status = domainEvent.Task.Status.ToString() + AssigneeId = domainEvent.Task.Assignee.Id }); } diff --git a/src/TaskTracker/Integration/EventProducing/TaskCreatedStreamingEvent.cs b/src/TaskTracker/Integration/EventProducing/TaskCreatedStreamingEvent.cs new file mode 100644 index 0000000..35a6f03 --- /dev/null +++ b/src/TaskTracker/Integration/EventProducing/TaskCreatedStreamingEvent.cs @@ -0,0 +1,22 @@ +namespace TaskTracker.Integration.EventProducing; + +public class TaskCreatedStreamingEvent : MessageBrokerEvent +{ + private static readonly MessageBrokerEventName EventName = new("Tasks", "Created"); + private static readonly MessageBrokerEventType EventType = MessageBrokerEventType.Streaming; + + public TaskCreatedStreamingEvent(Guid id, DateTime createdAt, DataType data) : base(id, EventName, EventType, 2, createdAt, data) + { + } + + public class DataType + { + public Guid Id { get; set; } + public string Title { get; set; } + public string Description { get; set; } + public double Fee { get; set; } + public double Reward { get; set; } + public TaskStatus Status { get; set; } + public Guid AssigneeId { get; set; } + } +} \ No newline at end of file diff --git a/src/TaskTracker/Integration/EventProducing/TaskCreatedStreamingEventProducer.cs b/src/TaskTracker/Integration/EventProducing/TaskCreatedStreamingEventProducer.cs new file mode 100644 index 0000000..0c11ba2 --- /dev/null +++ b/src/TaskTracker/Integration/EventProducing/TaskCreatedStreamingEventProducer.cs @@ -0,0 +1,26 @@ +using TaskTracker.Domain.Events; + +namespace TaskTracker.Integration.EventProducing; + +internal class TaskCreatedStreamingEventProducer : MessageBrokerDomainEventHandler +{ + public TaskCreatedStreamingEventProducer(IKafkaMessageProducer kafkaMessageProducer) : base(kafkaMessageProducer) + { + } + + protected override TaskCreatedStreamingEvent MapToMessageBrokerEvent(TaskCreatedDomainEvent domainEvent) + { + return new TaskCreatedStreamingEvent(Guid.NewGuid(), DateTime.UtcNow, new TaskCreatedStreamingEvent.DataType + { + Id = domainEvent.Task.Id, + AssigneeId = domainEvent.Task.Assignee.Id, + Title = domainEvent.Task.Title, + Description = domainEvent.Task.Description, + Fee = (double)domainEvent.Task.Price.Fee, + Reward = (double)domainEvent.Task.Price.Reward, + Status = (TaskStatus)domainEvent.Task.Status + }); + } + + protected override string GetTopic() => "task-streaming"; +} \ No newline at end of file diff --git a/src/TaskTracker/Integration/EventProducing/TaskCreatedStreamingEventV2.cs b/src/TaskTracker/Integration/EventProducing/TaskCreatedStreamingEventV2.cs new file mode 100644 index 0000000..fb3bb60 --- /dev/null +++ b/src/TaskTracker/Integration/EventProducing/TaskCreatedStreamingEventV2.cs @@ -0,0 +1,23 @@ +namespace TaskTracker.Integration.EventProducing; + +public class TaskCreatedStreamingEventV2 : MessageBrokerEvent +{ + private static readonly MessageBrokerEventName EventName = new("Tasks", "Created"); + private static readonly MessageBrokerEventType EventType = MessageBrokerEventType.Streaming; + + public TaskCreatedStreamingEventV2(Guid id, DateTime createdAt, DataType data) : base(id, EventName, EventType, 2, createdAt, data) + { + } + + public class DataType + { + public Guid Id { get; set; } + public string JiraId { get; set; } + public string Title { get; set; } + public string Description { get; set; } + public double Fee { get; set; } + public double Reward { get; set; } + public string Status { get; set; } + public Guid AssigneeId { get; set; } + } +} \ No newline at end of file diff --git a/src/TaskTracker/Integration/EventProducing/TaskReassignedBehaviourEvent.cs b/src/TaskTracker/Integration/EventProducing/TaskReassignedBehaviourEvent.cs new file mode 100644 index 0000000..8e63cc9 --- /dev/null +++ b/src/TaskTracker/Integration/EventProducing/TaskReassignedBehaviourEvent.cs @@ -0,0 +1,17 @@ +namespace TaskTracker.Integration.EventProducing; + +public class TaskReassignedBehaviourEvent : MessageBrokerEvent +{ + private static readonly MessageBrokerEventName EventName = new("Tasks", "Reassigned"); + private static readonly MessageBrokerEventType EventType = MessageBrokerEventType.Behaviour; + + public TaskReassignedBehaviourEvent(Guid id, DateTime createdAt, DataType data) : base(id, EventName, EventType, 1, createdAt, data) + { + } + + public class DataType + { + public Guid Id { get; set; } + public Guid NewAssigneeId { get; set; } + } +} \ No newline at end of file diff --git a/src/TaskTracker/Integration/EventProducing/TaskReassignedBehaviourEventProducer.cs b/src/TaskTracker/Integration/EventProducing/TaskReassignedBehaviourEventProducer.cs new file mode 100644 index 0000000..5a71586 --- /dev/null +++ b/src/TaskTracker/Integration/EventProducing/TaskReassignedBehaviourEventProducer.cs @@ -0,0 +1,23 @@ +using TaskTracker.Domain.Events; + +namespace TaskTracker.Integration.EventProducing; + +internal class TaskReassignedBehaviourEventProducer : MessageBrokerDomainEventHandler +{ + public TaskReassignedBehaviourEventProducer(IKafkaMessageProducer kafkaMessageProducer) : base(kafkaMessageProducer) + { + } + + protected override TaskReassignedBehaviourEvent MapToMessageBrokerEvent(TaskReassignedDomainEvent domainEvent) + { + var lastAssigment = domainEvent.Task.Assignments.Last(); + + return new TaskReassignedBehaviourEvent(Guid.NewGuid(), DateTime.UtcNow, new TaskReassignedBehaviourEvent.DataType() + { + Id = domainEvent.Task.Id, + NewAssigneeId = lastAssigment.NewPopug.Id + }); + } + + protected override string GetTopic() => "task-life-cycle"; +} \ No newline at end of file diff --git a/src/TaskTracker/Integration/Events/TaskUpdatedStreamingEvent.cs b/src/TaskTracker/Integration/EventProducing/TaskUpdatedStreamingEvent.cs similarity index 93% rename from src/TaskTracker/Integration/Events/TaskUpdatedStreamingEvent.cs rename to src/TaskTracker/Integration/EventProducing/TaskUpdatedStreamingEvent.cs index 4b58236..e66da60 100644 --- a/src/TaskTracker/Integration/Events/TaskUpdatedStreamingEvent.cs +++ b/src/TaskTracker/Integration/EventProducing/TaskUpdatedStreamingEvent.cs @@ -1,4 +1,4 @@ -namespace TaskTracker.Integration.Events; +namespace TaskTracker.Integration.EventProducing; public class TaskUpdatedStreamingEvent : MessageBrokerEvent { diff --git a/src/TaskTracker/Integration/IKafkaMessageProducer.cs b/src/TaskTracker/Integration/IKafkaMessageProducer.cs index 1a33d97..46de41a 100644 --- a/src/TaskTracker/Integration/IKafkaMessageProducer.cs +++ b/src/TaskTracker/Integration/IKafkaMessageProducer.cs @@ -3,4 +3,33 @@ namespace TaskTracker.Integration; internal interface IKafkaMessageProducer : IDisposable { Task Produce(string topic, string message, CancellationToken cancellationToken); +} + +internal class ResistantKafkaMessageProducer : IKafkaMessageProducer +{ + private readonly IKafkaMessageProducer _kafkaMessageProducer; + private readonly ILogger _logger; + + public ResistantKafkaMessageProducer(IKafkaMessageProducer kafkaMessageProducer, ILogger logger) + { + _kafkaMessageProducer = kafkaMessageProducer; + _logger = logger; + } + + public void Dispose() + { + _kafkaMessageProducer.Dispose(); + } + + public async Task Produce(string topic, string message, CancellationToken cancellationToken) + { + // TODO: Use Poly for retry policy + + var messageHash = message.GetHashCode(); + _logger.LogInformation("Sending {Message} to {Topic} [{MessageHash}]..", topic, message, messageHash); + + await _kafkaMessageProducer.Produce(topic, message, cancellationToken); + + _logger.LogInformation("Message [{MessageHash}] to {Topic} sent", messageHash, topic); + } } \ No newline at end of file diff --git a/src/TaskTracker/Integration/KafkaMessageHandler.cs b/src/TaskTracker/Integration/KafkaMessageHandler.cs index b3b5995..e8ee572 100644 --- a/src/TaskTracker/Integration/KafkaMessageHandler.cs +++ b/src/TaskTracker/Integration/KafkaMessageHandler.cs @@ -1,29 +1,28 @@ +using System.Text.Json; using MediatR; +using TaskTracker.Integration.EventConsuming; namespace TaskTracker.Integration; internal class KafkaMessageHandler : IKafkaMessageHandler { private readonly IPublisher _publisher; + private readonly IServiceScopeFactory _serviceScopeFactory; - public KafkaMessageHandler(IPublisher publisher) + public KafkaMessageHandler(IPublisher publisher, IServiceScopeFactory serviceScopeFactory) { _publisher = publisher; + _serviceScopeFactory = serviceScopeFactory; } public async Task Handle(string topic, string message) { - // TODO: Fix hardcoded - - await _publisher.Publish(new PopugCreatedStreamingMessage() - { - - }); - - if (topic.Contains("-streaming")) - { - // Streaming - } - + await using var scope = _serviceScopeFactory.CreateAsyncScope(); + + if (JsonSerializer.Deserialize(message, typeof(MessageBrokerEvent)) is not MessageBrokerEvent messageBrokerEvent) + throw new InvalidOperationException("Can't deserialize message to MessageBrokerEvent"); + + var popugCreatedStreamingEvent = (PopugCreatedStreamingEvent) messageBrokerEvent; + await _publisher.Publish(messageBrokerEvent); } } \ No newline at end of file diff --git a/src/TaskTracker/Integration/KafkaMessageListener.cs b/src/TaskTracker/Integration/KafkaMessageListener.cs index 24f7e92..0580d03 100644 --- a/src/TaskTracker/Integration/KafkaMessageListener.cs +++ b/src/TaskTracker/Integration/KafkaMessageListener.cs @@ -16,7 +16,7 @@ public KafkaMessageListener(IKafkaMessageHandler kafkaMessageHandler, ILogger consumer.Consume(stoppingToken), stoppingToken); - - // TODO: Prevent app crashing if problem with kafka occurs var consumeResult = await consumingTask; + await _kafkaMessageHandler.Handle(consumeResult.Topic, consumeResult.Message.Value); } - catch (ConsumeException ex) + catch (Exception ex) { - _logger.LogWarning(ex, "Consuming failed."); + _logger.LogError(ex, "Consuming message failed"); } } diff --git a/src/TaskTracker/Integration/MessageBrokerEvent.cs b/src/TaskTracker/Integration/MessageBrokerEvent.cs index b3d7ffc..735242b 100644 --- a/src/TaskTracker/Integration/MessageBrokerEvent.cs +++ b/src/TaskTracker/Integration/MessageBrokerEvent.cs @@ -1,6 +1,8 @@ -namespace TaskTracker.Integration; +using MediatR; -public class MessageBrokerEvent +namespace TaskTracker.Integration; + +public class MessageBrokerEvent : INotification { public MessageBrokerEvent(T data, Guid id, MessageBrokerEventName name, int version, DateTime createdAt) { @@ -18,7 +20,7 @@ public MessageBrokerEvent(T data, Guid id, MessageBrokerEventName name, int vers public T Data { get; } } -public class MessageBrokerEvent +public class MessageBrokerEvent : INotification { public MessageBrokerEvent(Guid id, MessageBrokerEventName name, MessageBrokerEventType type, int version, DateTime createdAt, object data) { diff --git a/src/TaskTracker/Integration/ResilientKafkaMessageHandler.cs b/src/TaskTracker/Integration/ResilientKafkaMessageHandler.cs new file mode 100644 index 0000000..97c4d72 --- /dev/null +++ b/src/TaskTracker/Integration/ResilientKafkaMessageHandler.cs @@ -0,0 +1,28 @@ +namespace TaskTracker.Integration; + +internal class ResilientKafkaMessageHandler : IKafkaMessageHandler +{ + private readonly IKafkaMessageHandler _kafkaMessageHandler; + private readonly ILogger _logger; + + public ResilientKafkaMessageHandler(IKafkaMessageHandler kafkaMessageHandler, ILogger logger) + { + _kafkaMessageHandler = kafkaMessageHandler; + _logger = logger; + } + + public async Task Handle(string topic, string message) + { + try + { + await _kafkaMessageHandler.Handle(topic, message); + } + catch (Exception ex) + { + // TODO: Ideally here must be dealing with dead letter queue (DLQ) + + // Currently just log dead message + _logger.LogError(ex, "Can't handle message from topic [{Topic}]: {MessageBody}", topic, message); + } + } +} \ No newline at end of file