Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ All notable changes to this project will be documented in this file.
### Changed
- Upgrade `investment-service-api` version from `1.4.1` to `1.6.0`; regenerate API clients and fix all compilation errors in production and test sources.


## [10.1.1](https://github.com/Backbase/stream-services/compare/10.1.0...10.1.1)
### Changed
- Added dependency validation to stream-compositions services pom.xml to fix product validation issues for arrangements with additional properties.

## [10.1.0](https://github.com/Backbase/stream-services/compare/9.17.0...10.1.0)
## [10.1.0]
### Changed
- Align Spring Boot and Spring Cloud versions with Service SDK 21.0.1 managed stack.
- Remove local Spring metadata plugin and Azure Service Bus version overrides in favor of the Service SDK 21.0.1 managed dependency chain.
Expand Down
321 changes: 203 additions & 118 deletions stream-investment/INVESTMENT_API_ENDPOINTS_USED.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion stream-investment/investment-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

<properties>
<checkstyle.disable.checks>true</checkstyle.disable.checks>
<investment-service-api.version>1.6.0</investment-service-api.version>
<investment-service-api.version>1.6.2</investment-service-api.version>
</properties>

<dependencyManagement>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
package com.backbase.stream.investment;

import static com.backbase.investment.api.service.v1.model.PortfolioProduct.JSON_PROPERTY_ADVICE_ENGINE;
import static com.backbase.investment.api.service.v1.model.PortfolioProduct.JSON_PROPERTY_EXTERNAL_ID;
import static com.backbase.investment.api.service.v1.model.PortfolioProduct.JSON_PROPERTY_EXTRA_DATA;
import static com.backbase.investment.api.service.v1.model.PortfolioProduct.JSON_PROPERTY_MODEL_PORTFOLIO;
import static com.backbase.investment.api.service.v1.model.PortfolioProduct.JSON_PROPERTY_PRODUCT_CATEGORY;
import static com.backbase.investment.api.service.v1.model.PortfolioProduct.JSON_PROPERTY_PRODUCT_TYPE;

import com.backbase.investment.api.service.v1.model.InvestorModelPortfolio;
import com.backbase.investment.api.service.v1.model.PortfolioProductBadge;
import com.backbase.investment.api.service.v1.model.PortfolioProductStatusEnum;
Expand Down Expand Up @@ -30,19 +37,19 @@ public class ProductPortfolio {
private Resource imageResource;
private Integer order;
private PortfolioProductBadge badge;
@JsonProperty("external_id")
@JsonProperty(JSON_PROPERTY_EXTERNAL_ID)
private String externalId;
private PortfolioProductStatusEnum status = PortfolioProductStatusEnum.ACTIVE;
@JsonProperty("product_category")
@JsonProperty(JSON_PROPERTY_PRODUCT_CATEGORY)
private String productCategory;
private UUID uuid;
@JsonProperty("advice_engine")
@JsonProperty(JSON_PROPERTY_ADVICE_ENGINE)
private String adviceEngine;
@JsonProperty("model_portfolio")
@JsonProperty(JSON_PROPERTY_MODEL_PORTFOLIO)
private InvestorModelPortfolio modelPortfolio;
@JsonProperty("product_type")
@JsonProperty(JSON_PROPERTY_PRODUCT_TYPE)
private ProductTypeEnum productType;
@JsonProperty("extra_data")
@JsonProperty(JSON_PROPERTY_EXTRA_DATA)
private Map<String, String> extraData = new HashMap<>();

}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class InvestmentSaga implements StreamTaskExecutor<InvestmentTask> {
public static final String RESULT_FAILED = "failed";

private static final String INVESTMENT_PRODUCTS = "investment-products";
private static final String INVESTMENT_PORTFOLIO_ALLOCATIONS = "investment-portfolio-allocations";
private static final String INVESTMENT_PORTFOLIO_TRADING_ACCOUNTS = "investment-portfolio-trading-accounts";
private static final String INVESTMENT_RISK_ASSESSMENTS = "investment-risk-assessments";
private static final String INVESTMENT_RISK_QUESTIONS = "investment-risk-questions";
Expand Down Expand Up @@ -97,19 +98,21 @@ public Mono<InvestmentTask> executeTask(InvestmentTask streamTask) {
}
log.info("Starting investment saga execution: taskId={}, taskName={}",
streamTask.getId(), streamTask.getName());
return this.upsertInvestmentPortfolioModels(streamTask)
.flatMap(this::loadAssets)
return this.loadAssets(streamTask)
.flatMap(this::upsertClients)
.flatMap(this::upsertRiskQuestions)
.flatMap(this::upsertRiskAssessments)
.flatMap(this::upsertInvestmentPortfolioModels)
.flatMap(this::upsertInvestmentProducts)
.flatMap(this::upsertInvestmentPortfolios)
.flatMap(this::upsertPortfolioTradingAccounts)
.flatMap(this::upsertInvestmentPortfolioDeposits)
.flatMap(this::upsertPortfoliosAllocations)
.doOnNext(completedTask -> log.info(
"Successfully completed investment saga: taskId={}, taskName={}, state={}",
completedTask.getId(), completedTask.getName(), completedTask.getState()))
.doOnSuccess(completedTask -> {
streamTask.setState(State.COMPLETED);
log.info("Successfully completed investment saga: taskId={}, taskName={}, state={}",
completedTask.getId(), completedTask.getName(), completedTask.getState());
})
.doOnError(throwable -> {
log.error("Failed to execute investment saga: taskId={}, taskName={}",
streamTask.getId(), streamTask.getName(), throwable);
Expand All @@ -118,19 +121,19 @@ public Mono<InvestmentTask> executeTask(InvestmentTask streamTask) {
"Investment saga failed: " + throwable.getMessage());
streamTask.setState(State.FAILED);
})
.onErrorResume(throwable -> Mono.just(streamTask));
.onErrorResume(_ -> Mono.just(streamTask));
}

private Mono<InvestmentTask> upsertInvestmentPortfolioDeposits(InvestmentTask investmentTask) {
return Flux.fromIterable(Objects.requireNonNullElse(investmentTask.getData().getPortfolios(), List.of()))
.flatMap(investmentPortfolioService::upsertDeposits)
.onErrorResume(throwable -> {
log.warn("Failed to create deposit for portfolio", throwable);
log.warn("Failed to create deposit for portfolio: taskId={}", investmentTask.getId(), throwable);
return Mono.empty();
})
.flatMap(investmentPortfolioAllocationService::createDepositAllocation)
.collectList()
.map(o -> investmentTask);
.map(_ -> investmentTask);
}

/**
Expand All @@ -140,7 +143,7 @@ private Mono<InvestmentTask> upsertInvestmentPortfolioDeposits(InvestmentTask in
* Manual cleanup should be performed if necessary through the Investment Service API.
*
* @param streamTask the task to rollback
* @return null - rollback not implemented
* @return empty {@link Mono} — rollback not implemented
*/
@Override
public Mono<InvestmentTask> rollBack(InvestmentTask streamTask) {
Expand All @@ -151,21 +154,27 @@ public Mono<InvestmentTask> rollBack(InvestmentTask streamTask) {

private Mono<InvestmentTask> upsertPortfoliosAllocations(InvestmentTask investmentTask) {
InvestmentData data = investmentTask.getData();
List<InvestmentPortfolio> portfolios = Objects.requireNonNullElse(data.getPortfolios(), List.of());
return asyncTaskService.checkPriceAsyncTasksFinished(data.getPriceAsyncTasks())
.thenMany(Flux.fromIterable(Objects.requireNonNullElse(data.getPortfolios(), List.of()))
.thenMany(Flux.fromIterable(portfolios)
.flatMap(
p -> investmentPortfolioAllocationService.generateAllocations(p,
data.getIngestedPortfolioProducts(),
investmentTask.getData().getInvestmentAssetData())))
.collectList()
.doOnError(throwable -> {
log.error("Allocation generation failed for portfolios:{} taskId={}",
data.getPortfolios().stream().map(InvestmentPortfolio::getPortfolio).map(PortfolioList::getUuid)
.toList(), investmentTask.getId(),
log.error("Allocation generation failed: taskId={}, portfolioUuids={}",
investmentTask.getId(),
portfolios.stream()
.map(InvestmentPortfolio::getPortfolio)
.filter(Objects::nonNull)
.map(PortfolioList::getUuid)
.toList(),
throwable);
investmentTask.error(INVESTMENT_PORTFOLIO_TRADING_ACCOUNTS, OP_UPSERT, RESULT_FAILED,
investmentTask.error(INVESTMENT_PORTFOLIO_ALLOCATIONS, OP_UPSERT, RESULT_FAILED,
investmentTask.getName(), investmentTask.getId(),
"Failed to upsert investment portfolio trading accounts: " + throwable.getMessage());
"Failed to generate investment portfolio allocations: " + throwable.getMessage());
investmentTask.setState(State.FAILED);
})
.map(_ -> investmentTask);
}
Expand Down Expand Up @@ -204,7 +213,7 @@ private Mono<InvestmentTask> upsertInvestmentPortfolios(InvestmentTask investmen
investmentTask.info(INVESTMENT_PORTFOLIOS, OP_UPSERT, RESULT_CREATED,
investmentTask.getName(), investmentTask.getId(),
UPSERTED_PREFIX + portfolios.size() + " investment portfolios");
investmentTask.setState(State.COMPLETED);

investmentTask.setPortfolios(portfolios);

log.info("Successfully upserted all investment portfolios: taskId={}, portfolioCount={}",
Expand All @@ -218,7 +227,6 @@ private Mono<InvestmentTask> upsertInvestmentPortfolios(InvestmentTask investmen
investmentTask.error(INVESTMENT_PORTFOLIOS, OP_UPSERT, RESULT_FAILED,
investmentTask.getName(), investmentTask.getId(),
"Failed to upsert investment portfolios: " + throwable.getMessage());
investmentTask.setState(State.FAILED);
});
}

Expand All @@ -239,7 +247,7 @@ private Mono<InvestmentTask> upsertInvestmentPortfolioModels(InvestmentTask inve
investmentTask.getName(), investmentTask.getId(),
UPSERTED_PREFIX + modelPortfolio.size() + " investment portfolio models");

log.info("Successfully upserted all investment portfolio models: taskId={}, productCount={}",
log.info("Successfully upserted all investment portfolio models: taskId={}, modelCount={}",
investmentTask.getId(), modelPortfolio.size());

return investmentTask;
Expand All @@ -256,19 +264,30 @@ private Mono<InvestmentTask> upsertInvestmentPortfolioModels(InvestmentTask inve

private Mono<InvestmentTask> loadAssets(InvestmentTask investmentTask) {
if (coreConfigurationProperties.isAssetUniverseEnabled()) {
log.debug("Skip loading assets. Assets have to be provided on previous step");
log.debug("Skipping asset load in investment saga; asset universe ingestion enabled: taskId={}",
investmentTask.getId());
return Mono.just(investmentTask);
}
log.info("Loading assets");
List<Asset> assets = investmentTask.getData().getInvestmentAssetData().getAssets();
InvestmentData data = investmentTask.getData();
if (data.getInvestmentAssetData() == null) {
log.debug("Skipping asset load; no investment asset data on task: taskId={}", investmentTask.getId());
return Mono.just(investmentTask);
}
List<Asset> assets = Objects.requireNonNullElse(data.getInvestmentAssetData().getAssets(), List.of());
if (assets.isEmpty()) {
log.debug("Skipping asset load; asset list is empty: taskId={}", investmentTask.getId());
return Mono.just(investmentTask);
}
log.info("Loading assets from asset universe API: taskId={}, assetCount={}",
investmentTask.getId(), assets.size());
return Flux.fromIterable(assets)
.flatMap(asset -> assetUniverseApi.getAsset(asset.getKeyString(), null, null, null)
.map(a -> {
asset.setUuid(a.getUuid());
return asset;
}))
.collectList()
.map(o -> investmentTask);
.map(_ -> investmentTask);

}

Expand Down Expand Up @@ -406,11 +425,11 @@ private static boolean isAssessmentApplicable(UserRiskAssessment a, String userN
}

private Mono<InvestmentTask> upsertPortfolioTradingAccounts(InvestmentTask investmentTask) {
List<InvestmentPortfolioTradingAccount> investmentPortfolioTradingAccounts = investmentTask.getData()
.getInvestmentPortfolioTradingAccounts();
List<InvestmentPortfolioTradingAccount> investmentPortfolioTradingAccounts = Objects.requireNonNullElse(
investmentTask.getData().getInvestmentPortfolioTradingAccounts(), List.of());
int accountsCount = investmentPortfolioTradingAccounts.size();

log.info("Starting investment portfolio trading accounts upsert: taskId={}, arrangementCount={}",
log.info("Starting investment portfolio trading accounts upsert: taskId={}, accountsCount={}",
investmentTask.getId(), accountsCount);

investmentTask.info(INVESTMENT_PORTFOLIO_TRADING_ACCOUNTS, OP_UPSERT, null, investmentTask.getName(),
Expand All @@ -421,12 +440,12 @@ private Mono<InvestmentTask> upsertPortfolioTradingAccounts(InvestmentTask inves
investmentTask.info(INVESTMENT_PORTFOLIO_TRADING_ACCOUNTS, OP_UPSERT, RESULT_CREATED,
investmentTask.getName(), investmentTask.getId(),
UPSERTED_PREFIX + products.size() + " investment portfolio trading accounts");
log.info("Successfully upserted all investment portfolio trading accounts: taskId={}, productCount={}",
log.info("Successfully upserted all investment portfolio trading accounts: taskId={}, accountsCount={}",
investmentTask.getId(), products.size());
})
.thenReturn(investmentTask)
.doOnError(throwable -> {
log.error("Failed to upsert investment portfolio trading accounts: taskId={}, arrangementCount={}",
log.error("Failed to upsert investment portfolio trading accounts: taskId={}, accountsCount={}",
investmentTask.getId(), accountsCount, throwable);
investmentTask.error(INVESTMENT_PORTFOLIO_TRADING_ACCOUNTS, OP_UPSERT, RESULT_FAILED,
investmentTask.getName(), investmentTask.getId(),
Expand Down Expand Up @@ -472,7 +491,6 @@ private Mono<InvestmentTask> upsertClients(InvestmentTask streamTask) {
streamTask.data(clients);
streamTask.info(INVESTMENT, OP_UPSERT, RESULT_CREATED, streamTask.getName(), streamTask.getId(),
UPSERTED_PREFIX + clients.size() + " investment clients");
streamTask.setState(State.COMPLETED);

log.info("Successfully upserted all clients: taskId={}, clientCount={}, successCount={}",
streamTask.getId(), clientCount, clients.size());
Expand All @@ -485,7 +503,6 @@ private Mono<InvestmentTask> upsertClients(InvestmentTask streamTask) {
streamTask.error(INVESTMENT, OP_UPSERT, RESULT_FAILED,
streamTask.getName(), streamTask.getId(),
"Failed to upsert investment clients: " + throwable.getMessage());
streamTask.setState(State.FAILED);
});
}

Expand Down
Loading
Loading