Skip to content
This repository was archived by the owner on Oct 10, 2023. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
63bafe7
ft/TD-249/optimize db schema (#48)
mr-impossibru May 26, 2022
a6fa291
TD-249: fix refund aggregate (#49)
mr-impossibru May 30, 2022
0b9fe90
new kafka config + iam support (#52)
echerniak Jul 20, 2022
077cb04
Bump damsel to 1.569-09e7a75
ex01tus Jul 27, 2022
ab8a179
Added counting max for country (#54)
inallyoung Aug 3, 2022
58014df
Changed withdrawal_session table column type (#56)
inallyoung Aug 23, 2022
0068cd6
Fix script
Aug 23, 2022
20ec64a
OPS-165: add limit config (#60)
karle0wne Aug 24, 2022
624317f
Remove secure data columns (#62)
strug Sep 15, 2022
9353690
Removed null setting for party revision
Sep 20, 2022
f295a28
Add term id to daway (#65)
strug Sep 22, 2022
a24a3a5
fix npe with key (#68)
ggmaleva Sep 26, 2022
f7c6621
OPS-200: add indices (#69)
mr-impossibru Oct 4, 2022
8f4822e
add exrate listener (#72)
vitaxa Oct 19, 2022
5ee616d
add new limiter scopes (#73)
karle0wne Nov 24, 2022
c82b4aa
Add exception handling in DominantPoller
malkoas Dec 12, 2022
d9ca88e
Revert "Add exception handling in DominantPoller"
malkoas Dec 12, 2022
a95f513
Add exception handling in DominantPoller
malkoas Dec 12, 2022
f02e827
Revert "Add exception handling in DominantPoller"
malkoas Dec 12, 2022
72f4c17
Add exception handling in DominantPoller
malkoas Dec 12, 2022
65eaa71
Revert "Add exception handling in DominantPoller"
malkoas Dec 12, 2022
bf3311a
Add exception handling in DominantPoller (#76)
malkoas Dec 12, 2022
2703a2e
Fix polling (#88)
strug Feb 20, 2023
67895fe
Fix build (#89)
strug Feb 20, 2023
f3c1f8c
Bump damsel
tolkonepiu May 19, 2023
72f6b9f
Add session info (#92)
strug Jun 15, 2023
cb20cbc
Bump damsel (#93)
strug Jun 15, 2023
31a5a90
Add status cascad (#94)
strug Jun 15, 2023
9615c92
Add status cascad (#95)
strug Jun 15, 2023
672d15a
Add terminal (#96)
strug Jun 15, 2023
76d1b4d
add withdrawal adjustment (#100)
ggmaleva Aug 18, 2023
946ce65
fix adjustment id (#102)
ggmaleva Aug 18, 2023
f3d96b1
add withdrawal adjustment listener (#104)
ggmaleva Sep 18, 2023
43b53e6
add rack property in consumer group from file (#105)
ggmaleva Sep 20, 2023
bc2a58d
change WithdrawalAdjustment constraint and index (#106)
ggmaleva Sep 21, 2023
420b93f
refactoring rack file flow (#107)
ggmaleva Sep 22, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
6 changes: 3 additions & 3 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
name: Maven Build Artifact
name: Build Maven Artifact

on:
pull_request:
branches:
- '*'
- '**'

jobs:
build:
uses: valitydev/java-workflow/.github/workflows/maven-service-build.yml@v1
uses: valitydev/base-workflow/.github/workflows/maven-service-build.yml@v2
11 changes: 4 additions & 7 deletions .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
name: Maven Deploy Artifact
name: Deploy Docker Image

on:
push:
branches:
- 'master'
- 'main'

env:
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}
- 'epic/**'

jobs:
deploy:
uses: valitydev/java-workflow/.github/workflows/maven-service-deploy.yml@v1
build-and-deploy:
uses: valitydev/base-workflow/.github/workflows/maven-service-deploy.yml@v2
secrets:
github-token: ${{ secrets.GITHUB_TOKEN }}
mm-webhook-url: ${{ secrets.MATTERMOST_WEBHOOK_URL }}
32 changes: 27 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>dev.vality</groupId>
<artifactId>service-parent-pom</artifactId>
<version>1.0.16</version>
<version>2.0.0-BETA-11</version>
</parent>

<artifactId>newway</artifactId>
Expand All @@ -25,7 +25,7 @@
<db.user>postgres</db.user>
<db.password>postgres</db.password>
<db.name>newway</db.name>
<db.schema>nw</db.schema>
<db.schema>dw</db.schema>
<local.pg.url>jdbc:postgresql://localhost:5432/newway</local.pg.url>
<local.pg.port>5432</local.pg.port>
<checkstyle.config.suppressions.path>./src/main/resources/checkstyle/checkstyle-suppressions.xml
Expand Down Expand Up @@ -65,7 +65,7 @@
<dependency>
<groupId>de.codecentric</groupId>
<artifactId>spring-boot-admin-starter-client</artifactId>
<version>2.6.6</version>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
Expand Down Expand Up @@ -123,6 +123,10 @@
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.msk</groupId>
<artifactId>aws-msk-iam-auth</artifactId>
</dependency>

<!--dev.vality-->
<dependency>
Expand Down Expand Up @@ -155,17 +159,29 @@
<dependency>
<groupId>dev.vality</groupId>
<artifactId>damsel</artifactId>
<!-- TODO: bump version in parent -->
<version>1.597-bfedcb9</version>
</dependency>
<dependency>
<groupId>dev.vality</groupId>
<artifactId>fistful-proto</artifactId>
<version>1.145-c45166d</version>
<version>1.159-936ed9a</version>
</dependency>
<dependency>
<groupId>dev.vality</groupId>
<artifactId>xrates-proto</artifactId>
<version>1.23-bf0d62d</version>
</dependency>
<dependency>
<groupId>dev.vality</groupId>
<artifactId>limiter-proto</artifactId>
<version>1.33-31de59b</version>
</dependency>
<dependency>
<groupId>dev.vality</groupId>
<artifactId>exrates-proto</artifactId>
<version>1.3-875328b</version>
</dependency>
<dependency>
<groupId>dev.vality</groupId>
<artifactId>shared-resources</artifactId>
Expand All @@ -188,7 +204,13 @@
<dependency>
<groupId>dev.vality</groupId>
<artifactId>testcontainers-annotations</artifactId>
<version>1.4.0</version>
<version>1.4.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.2.0</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package dev.vality.newway.config;

import dev.vality.damsel.domain_config.RepositorySrv;
import dev.vality.newway.domain.Nw;
import dev.vality.newway.domain.Dw;
import dev.vality.woody.thrift.impl.http.THSpawnClientBuilder;
import org.jooq.Schema;
import org.springframework.beans.factory.annotation.Value;
Expand All @@ -25,6 +25,6 @@ public RepositorySrv.Iface dominantClient(@Value("${dmt.url}") Resource resource

@Bean
public Schema schema() {
return Nw.NW;
return Dw.DW;
}
}
18 changes: 9 additions & 9 deletions src/main/java/dev/vality/newway/config/CacheConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,23 @@

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import dev.vality.newway.model.InvoiceWrapper;
import dev.vality.newway.model.InvoicingKey;
import dev.vality.newway.model.PaymentWrapper;
import dev.vality.newway.model.PartyShop;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.Duration;

@Configuration
public class CacheConfig {

@Bean
public Cache<InvoicingKey, InvoiceWrapper> invoiceDataCache(@Value("${cache.invoice.size}") int cacheSize) {
return Caffeine.newBuilder().maximumSize(cacheSize).build();
public Cache<String, PartyShop> partyShopDataCache(@Value("${cache.party-shop.size}") int cacheSize,
@Value("${cache.party-shop.expire.after.sec}") long expireAfter) {
return Caffeine.newBuilder()
.maximumSize(cacheSize)
.expireAfterWrite(Duration.ofSeconds(expireAfter))
.build();
}

@Bean
public Cache<InvoicingKey, PaymentWrapper> paymentDataCache(@Value("${cache.payment.size}") int cacheSize) {
return Caffeine.newBuilder().maximumSize(cacheSize).build();
}
}
114 changes: 64 additions & 50 deletions src/main/java/dev/vality/newway/config/KafkaConfig.java
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
package dev.vality.newway.config;

import dev.vality.exrates.events.CurrencyEvent;
import dev.vality.kafka.common.util.ExponentialBackOffDefaultErrorHandlerFactory;
import dev.vality.machinegun.eventsink.MachineEvent;
import dev.vality.newway.config.properties.KafkaConsumerProperties;
import dev.vality.newway.config.properties.KafkaSslProperties;
import dev.vality.newway.serde.CurrencyExchangeRateEventDeserializer;
import dev.vality.newway.serde.PayoutEventDeserializer;
import dev.vality.newway.serde.SinkEventDeserializer;
import dev.vality.newway.service.FileService;
import dev.vality.payout.manager.Event;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
Expand All @@ -24,64 +23,50 @@
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;

import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

@Configuration
@RequiredArgsConstructor
@EnableConfigurationProperties(KafkaSslProperties.class)
@SuppressWarnings("LineLength")
public class KafkaConfig {

private final KafkaProperties kafkaProperties;
private final KafkaConsumerProperties kafkaConsumerProperties;
private final FileService fileService;

@Value("${kafka.topics.party-management.consumer.group-id}")
private String partyConsumerGroup;
@Value("${kafka.client-id}")
private String clientId;
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;

@Value("${kafka.topics.exrate.consumer.group-id}")
private String exrateConsumerGroup;

@Value("${kafka.topics.withdrawal-adjustment.consumer.group-id}")
private String withdrawalAdjustmentConsumerGroup;

@Value("${kafka.rack.path:/tmp/.kafka_rack_env}")
private String rackPath;

@Bean
public Map<String, Object> consumerConfigs(KafkaSslProperties kafkaSslProperties) {
return createConsumerConfig(kafkaSslProperties);
public Map<String, Object> consumerConfigs() {
return createConsumerConfig();
}

private Map<String, Object> createConsumerConfig(KafkaSslProperties kafkaSslProperties) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
private Map<String, Object> createConsumerConfig() {
Map<String, Object> props = kafkaProperties.buildConsumerProperties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SinkEventDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerProperties.getGroupId());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConsumerProperties.isEnableAutoCommit());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaConsumerProperties.getAutoOffsetReset());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerProperties.getMaxPollRecords());
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConsumerProperties.getSessionTimeoutMs());
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerProperties.getMaxPollIntervalMs());
configureSsl(props, kafkaSslProperties);
return props;
}

private void configureSsl(Map<String, Object> props, KafkaSslProperties kafkaSslProperties) {
if (kafkaSslProperties.isEnabled()) {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name());
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
new File(kafkaSslProperties.getTrustStoreLocation()).getAbsolutePath());
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaSslProperties.getTrustStorePassword());
props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, kafkaSslProperties.getKeyStoreType());
props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, kafkaSslProperties.getTrustStoreType());
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
new File(kafkaSslProperties.getKeyStoreLocation()).getAbsolutePath());
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, kafkaSslProperties.getKeyStorePassword());
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaSslProperties.getKeyPassword());
String clientRack = fileService.getClientRack(rackPath);
if (Objects.nonNull(clientRack)) {
props.put(ConsumerConfig.CLIENT_RACK_CONFIG, clientRack);
}
return props;
}

@Bean
public ConsumerFactory<String, MachineEvent> consumerFactory(KafkaSslProperties kafkaSslProperties) {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(kafkaSslProperties));
public ConsumerFactory<String, MachineEvent> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
Expand Down Expand Up @@ -127,10 +112,23 @@ public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Event>> payoutContainerFactory(
KafkaSslProperties kafkaSslProperties) {
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MachineEvent>> withdrawalAdjustmentContainerFactory() {
Map<String, Object> props = kafkaProperties.buildConsumerProperties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SinkEventDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, withdrawalAdjustmentConsumerGroup);
String clientRack = fileService.getClientRack(rackPath);
if (Objects.nonNull(clientRack)) {
props.put(ConsumerConfig.CLIENT_RACK_CONFIG, clientRack);
}
ConsumerFactory<String, MachineEvent> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getWithdrawalAdjustmentConcurrency());
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Event>> payoutContainerFactory() {
DefaultKafkaConsumerFactory<String, Event> kafkaConsumerFactory =
new DefaultKafkaConsumerFactory<>(createConsumerConfig(kafkaSslProperties));
new DefaultKafkaConsumerFactory<>(createConsumerConfig());
kafkaConsumerFactory.setValueDeserializer(new PayoutEventDeserializer());
ConcurrentKafkaListenerContainerFactory<String, Event> factory =
new ConcurrentKafkaListenerContainerFactory<>();
Expand All @@ -157,17 +155,33 @@ public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MachineEvent>> partyManagementContainerFactory(
KafkaSslProperties kafkaSslProperties) {
Map<String, Object> configs = createConsumerConfig(kafkaSslProperties);
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MachineEvent>> partyManagementContainerFactory() {
Map<String, Object> configs = createConsumerConfig();
configs.put(ConsumerConfig.GROUP_ID_CONFIG, partyConsumerGroup);
ConsumerFactory<String, MachineEvent> consumerFactory = new DefaultKafkaConsumerFactory<>(configs);
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getPartyManagementConcurrency());
}

private KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MachineEvent>> createConcurrentFactory(
ConsumerFactory<String, MachineEvent> consumerFactory, int threadsNumber) {
ConcurrentKafkaListenerContainerFactory<String, MachineEvent> factory =
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MachineEvent>> limitConfigContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getLimitConfigConcurrency());
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, CurrencyEvent>> exchangeRateContainerFactory() {
Map<String, Object> props = kafkaProperties.buildConsumerProperties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CurrencyExchangeRateEventDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, exrateConsumerGroup);
ConsumerFactory<String, CurrencyEvent> consumerFactory = new DefaultKafkaConsumerFactory<>(props);

return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getExrateConcurrency());
}

private <T> KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, T>> createConcurrentFactory(
ConsumerFactory<String, T> consumerFactory, int threadsNumber) {
ConcurrentKafkaListenerContainerFactory<String, T> factory =
new ConcurrentKafkaListenerContainerFactory<>();
initFactory(consumerFactory, threadsNumber, factory);
return factory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
@EnableSchedulerLock(defaultLockAtMostFor = "PT5M")
public class SchedulerConfig {

public static final String TABLE_NAME = "nw.shedlock";
public static final String TABLE_NAME = "dw.shedlock";

@Bean
public DominantPoller dominantPoller(RepositorySrv.Iface dominantClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,7 @@
@ConfigurationProperties(prefix = "kafka.consumer")
public class KafkaConsumerProperties {

private String autoOffsetReset;
private boolean enableAutoCommit;
private String groupId;
private int maxPollRecords;
private int maxPollIntervalMs;
private int sessionTimeoutMs;
private int invoicingConcurrency;
private int recurrentPaymentToolConcurrency;
private int partyManagementConcurrency;
Expand All @@ -29,5 +24,8 @@ public class KafkaConsumerProperties {
private int sourceConcurrency;
private int destinationConcurrency;
private int withdrawalSessionConcurrency;
private int limitConfigConcurrency;
private int exrateConcurrency;
private int withdrawalAdjustmentConcurrency;

}
Loading