Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions descriptors/ModuleDescriptor-template.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"inventory-storage.material-types.collection.get",
"inventory-storage.loan-types.collection.get",
"inventory-storage.holdings-note-types.collection.get",
"inventory-storage.bound-with-parts.collection.get",
"circulation-storage.loans.collection.get",
"circulation.requests.collection.get",
"orders.pieces.collection.get",
Expand All @@ -39,6 +40,7 @@
"inventory-storage.material-types.collection.get",
"inventory-storage.loan-types.collection.get",
"inventory-storage.holdings-note-types.collection.get",
"inventory-storage.bound-with-parts.collection.get",
"circulation-storage.loans.collection.get",
"circulation.requests.collection.get",
"orders.pieces.collection.get",
Expand All @@ -63,6 +65,7 @@
"inventory-storage.material-types.collection.get",
"inventory-storage.loan-types.collection.get",
"inventory-storage.holdings-note-types.collection.get",
"inventory-storage.bound-with-parts.collection.get",
"circulation-storage.loans.collection.get",
"circulation.requests.collection.get",
"orders.pieces.collection.get",
Expand Down Expand Up @@ -154,6 +157,10 @@
{
"id": "settings",
"version": "1.1 1.2"
},
{
"id": "bound-with-parts-storage",
"version": "2.0"
}
],
"optional": [
Expand Down Expand Up @@ -225,6 +232,7 @@
"inventory-storage.holdings.retrieve.collection.post",
"inventory-storage.location-units.libraries.collection.get",
"inventory-storage.locations.collection.get",
"inventory-storage.bound-with-parts.collection.get",
"inventory-storage.material-types.collection.get",
"inventory-storage.loan-types.collection.get",
"inventory-storage.holdings-note-types.collection.get",
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/folio/rtaccache/client/InventoryClient.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.folio.rtaccache.client;

import org.folio.rtaccache.domain.dto.BoundWithParts;
import org.folio.rtaccache.domain.dto.FolioCqlRequest;
import org.folio.rtaccache.domain.dto.HoldingRecords;
import org.folio.rtaccache.domain.dto.HoldingsNoteTypes;
Expand Down Expand Up @@ -27,6 +28,9 @@ public interface InventoryClient {
@PostMapping("/item-storage/items/retrieve")
Items getItems(@RequestBody FolioCqlRequest request);

@GetMapping("/inventory-storage/bound-with-parts")
BoundWithParts getBoundWithParts(@SpringQueryMap FolioCqlRequest request);

@GetMapping("/locations")
Locations getLocations(@SpringQueryMap FolioCqlRequest request);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.folio.rtaccache.domain.dto.CirculationEntityType.REQUEST;
import static org.folio.rtaccache.domain.dto.InventoryEntityType.HOLDINGS;
import static org.folio.rtaccache.domain.dto.InventoryEntityType.ITEM;
import static org.folio.rtaccache.domain.dto.InventoryEntityType.ITEM_BOUND_WITH;
import static org.folio.rtaccache.domain.dto.InventoryEntityType.LIBRARY;
import static org.folio.rtaccache.domain.dto.InventoryEntityType.LOCATION;

Expand All @@ -29,6 +30,7 @@ public class KafkaMessageListener {
private static final String PIECE_LISTENER_ID = "mod-rtac-cache-piece-listener";
private static final String LOCATIONS_LISTENER_ID = "mod-rtac-cache-location-listener";
private static final String LIBRARIES_LISTENER_ID = "mod-rtac-cache-library-listener";
private static final String BOUND_WITH_LISTENER_ID = "mod-rtac-cache-bound-with-listener";
private static final String FOLIO_TENANT_ID_HEADER = "folio.tenantId";

private final SystemUserScopedExecutionService executionService;
Expand Down Expand Up @@ -117,7 +119,7 @@ public void handlePieceRecord(ConsumerRecord<String, PieceResourceEvent> consume
concurrency = "#{folioKafkaProperties.listener['location'].concurrency}",
topicPattern = "#{folioKafkaProperties.listener['location'].topicPattern}",
autoStartup = "false")
public void handleLocation(ConsumerRecord<String, InventoryResourceEvent> consumerRecord) {
public void handleLocationRecord(ConsumerRecord<String, InventoryResourceEvent> consumerRecord) {
var tenantId = consumerRecord.value().getTenant();
executionService.executeAsyncSystemUserScoped(tenantId, () -> {
var resourceEvent = consumerRecord.value();
Expand All @@ -133,7 +135,7 @@ public void handleLocation(ConsumerRecord<String, InventoryResourceEvent> consum
concurrency = "#{folioKafkaProperties.listener['library'].concurrency}",
topicPattern = "#{folioKafkaProperties.listener['library'].topicPattern}",
autoStartup = "false")
public void handleLibrary(ConsumerRecord<String, InventoryResourceEvent> consumerRecord) {
public void handleLibraryRecord(ConsumerRecord<String, InventoryResourceEvent> consumerRecord) {
var tenantId = consumerRecord.value().getTenant();
executionService.executeAsyncSystemUserScoped(tenantId, () -> {
var resourceEvent = consumerRecord.value();
Expand All @@ -142,6 +144,21 @@ public void handleLibrary(ConsumerRecord<String, InventoryResourceEvent> consume
} );
}

@KafkaListener(
id = BOUND_WITH_LISTENER_ID,
containerFactory = "inventoryKafkaListenerContainerFactory",
groupId = "#{folioKafkaProperties.listener['bound-with'].groupId}",
concurrency = "#{folioKafkaProperties.listener['bound-with'].concurrency}",
topicPattern = "#{folioKafkaProperties.listener['bound-with'].topicPattern}")
public void handleBoundWithRecord(ConsumerRecord<String, InventoryResourceEvent> consumerRecord) {
var tenantId = consumerRecord.value().getTenant();
executionService.executeAsyncSystemUserScoped(tenantId, () -> {
var resourceEvent = consumerRecord.value();
eventHandlerFactory.getInventoryHandler(resourceEvent.getType(), ITEM_BOUND_WITH)
.handle(resourceEvent);
} );
}

private String getFolioTenantFromHeader(ConsumerRecord<String, PieceResourceEvent> consumerRecord) {
return new String(consumerRecord
.headers()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,6 @@ public interface RtacHoldingRepository extends JpaRepository<RtacHoldingEntity,
@Query(value = "SELECT * FROM rtac_holding WHERE rtac_holding_json->>'holdingsId' = :holdingsId", nativeQuery = true)
List<RtacHoldingEntity> findAllByHoldingsId(@Param("holdingsId") String holdingsId);

@Query(value = "SELECT * FROM rtac_holding WHERE rtac_holding_json->'location'->>'id' = :locationId", nativeQuery = true)
List<RtacHoldingEntity> findAllByLocationId(@Param("locationId") String locationId);

@Query(value = "SELECT * FROM rtac_holding WHERE rtac_holding_json->'library'->>'id' = :libraryId", nativeQuery = true)
List<RtacHoldingEntity> findAllByLibraryId(@Param("libraryId") String libraryId);

@Modifying(clearAutomatically = true, flushAutomatically = true)
@Query(value = "DELETE FROM rtac_holding WHERE rtac_holding_json->>'holdingsId' = :holdingsId", nativeQuery = true)
void deleteAllByHoldingsId(@Param("holdingsId") String holdingsId);
Expand Down Expand Up @@ -105,24 +99,4 @@ LocationStatusCounts AS (
nativeQuery = true)
List<RtacSummaryProjection> findRtacSummariesByInstanceIds(@Param("schemas") String schemas, @Param("instanceIds") UUID[] instanceIds, @Param("onlyShared") boolean onlyShared);

@Modifying
@Query(value = """
UPDATE rtac_holding_entity
SET rtac_holding_json = jsonb_set(
jsonb_set(
rtac_holding_json,
'{location,name}',
to_jsonb(:name::text)
),
'{location,code}',
to_jsonb(:code::text)
)
WHERE id IN (
SELECT id FROM rtac_holding_entity
WHERE rtac_holding_json->'location'->>'id' = :locationId
)
""", nativeQuery = true)
int updateLocationDataBatch(@Param("locationId") String locationId,
@Param("name") String name,
@Param("code") String code);
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
import java.util.concurrent.CompletableFuture;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.folio.rtaccache.client.InventoryClient;
import org.folio.rtaccache.domain.RtacHoldingEntity;
import org.folio.rtaccache.domain.RtacHoldingId;
import org.folio.rtaccache.domain.dto.BoundWithPart;
import org.folio.rtaccache.domain.dto.FolioCqlRequest;
import org.folio.rtaccache.domain.dto.HoldingsRecord;
import org.folio.rtaccache.domain.dto.Instance;
Expand All @@ -30,14 +33,15 @@ public class RtacCacheGenerationService {
@Qualifier("applicationTaskExecutor")
private final AsyncTaskExecutor taskExecutor;
private final InventoryClient inventoryClient;
private final RtacHoldingBulkRepository rtacHoldingRepository;
private final RtacHoldingBulkRepository rtacHoldingBulkRepository;
private final RtacHoldingMappingService rtacHoldingMappingService;
private final CirculationService circulationService;
private final OrdersService ordersService;
private final FolioExecutionContext folioExecutionContext;
private static final Integer HOLDINGS_BATCH_SIZE = 50;
private static final Integer ITEMS_BATCH_SIZE = 500;
private static final String CONSORTIUM_SOURCE = "CONSORTIUM";
private static final Integer BOUND_WITH_BATCH_SIZE = 500;

public CompletableFuture<Void> generateRtacCache(String instanceId) {
log.info("Started RTAC cache generation for instance id: {} in tenant: {}", instanceId, folioExecutionContext.getTenantId());
Expand Down Expand Up @@ -65,7 +69,8 @@ private Runnable processIndividualHolding(Instance instance, HoldingsRecord hold
return () -> {
log.info("Processing holding id : {}", holding.getId());
saveHolding(instance, holding);
var itemsFuture = processItemsForHolding(instance, holding);
var itemsFuture = processDirectItemsForHolding(instance, holding)
.thenCompose(v -> processItemsBoundWithHolding(instance, holding));
var piecesFuture = processPiecesForHolding(instance, holding);
itemsFuture.join();
piecesFuture.join();
Expand All @@ -77,25 +82,60 @@ private void saveHolding(Instance instance, HoldingsRecord holding) {
var entityId = RtacHoldingId.from(rtacHolding);
var rtacHoldingEntity = new RtacHoldingEntity(entityId, isInstanceShared(instance), rtacHolding, Instant.now());
try {
rtacHoldingRepository.bulkUpsert(List.of(rtacHoldingEntity));
rtacHoldingBulkRepository.bulkUpsert(List.of(rtacHoldingEntity));
} catch (Exception e) {
log.error("Error during bulk upsert of RTAC holdings for holding: {}", e.getMessage(), e);
}
}

private CompletableFuture<Void> processItemsForHolding(Instance instance, HoldingsRecord holding) {
private CompletableFuture<Void> processDirectItemsForHolding(Instance instance, HoldingsRecord holding) {
var itemsOffset = 0;
var totalItems = getItemsTotalRecords(holding.getId());
var futures = new ArrayList<CompletableFuture<Void>>();
while (totalItems != 0 && itemsOffset < totalItems) {
var itemsCql = getItemsByHoldingIdCql(holding.getId());
var itemsCql = getByHoldingsIdCql(holding.getId());
var itemsRequest = new FolioCqlRequest(itemsCql, ITEMS_BATCH_SIZE, itemsOffset);
futures.add(processItemsBatch(instance, holding, itemsRequest));
itemsOffset += ITEMS_BATCH_SIZE;
}
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}

private CompletableFuture<Void> processItemsBoundWithHolding(Instance instance, HoldingsRecord holdings) {
log.info("Processing bound-with items for holding id : {}", holdings.getId());
var boundWithPartOffset = 0;
var totalBoundWithParts = getBoundWithTotal(holdings);
log.info("Total bound-with parts for holding id {} : {}", holdings.getId(), totalBoundWithParts);
var futures = new ArrayList<CompletableFuture<Void>>();
while (totalBoundWithParts != 0 && boundWithPartOffset < totalBoundWithParts) {
var boundWithPartsCql = getByHoldingsIdCql(holdings.getId());
var boundWithPartsRequest = new FolioCqlRequest(boundWithPartsCql, ITEMS_BATCH_SIZE, boundWithPartOffset);
futures.add(processBondWithItemBatch(instance, holdings, boundWithPartsRequest));
boundWithPartOffset += BOUND_WITH_BATCH_SIZE;
}
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}

private CompletableFuture<Void> processBondWithItemBatch(Instance instance, HoldingsRecord holdings, FolioCqlRequest boundWithPartsRequest) {
return CompletableFuture.supplyAsync(() -> {
var boundWithPartsResponse = inventoryClient.getBoundWithParts(boundWithPartsRequest);
log.info("Fetched {} bound-with parts for holding id: {}", boundWithPartsResponse.getTotalRecords(), holdings.getId());
return boundWithPartsResponse.getBoundWithParts()
.stream()
.map(BoundWithPart::getItemId)
.toList();
}, taskExecutor)
.thenComposeAsync(boundWithItemIds -> {
if (CollectionUtils.isEmpty(boundWithItemIds)) {
log.info("No bound-with itemIds collected for holding id: {}", holdings.getId());
return CompletableFuture.completedFuture(null);
}
var queryParamValue = buildIdOrCql(boundWithItemIds);
var folioCqlRequest = new FolioCqlRequest(queryParamValue, boundWithItemIds.size(), 0);
return processItemsBatch(instance, holdings, folioCqlRequest);
}, taskExecutor);
}

private CompletableFuture<Void> processPiecesForHolding(Instance instance, HoldingsRecord holding) {
return CompletableFuture.supplyAsync(() -> {
log.info("Sending request for pieces for holding id: {}", holding.getId());
Expand All @@ -110,7 +150,7 @@ private CompletableFuture<Void> processPiecesForHolding(Instance instance, Holdi
RtacHoldingId.from(rtacHolding), isInstanceShared(instance), rtacHolding, Instant.now()))
.toList();
try {
rtacHoldingRepository.bulkUpsert(rtacHoldings);
rtacHoldingBulkRepository.bulkUpsert(rtacHoldings);
log.info("Saved pieces for holding: {}", holding.getId());
} catch (Exception e) {
log.error("Error during bulk upsert of RTAC holdings for pieces: {}", e.getMessage(), e);
Expand All @@ -132,7 +172,7 @@ private CompletableFuture<Void> processItemsBatch(Instance instance, HoldingsRec
.map(item -> processIndividualItem(instance, holding, item, itemsLoanDueDateMap, itemsHoldCountMap))
.toList();
try {
rtacHoldingRepository.bulkUpsert(rtacHoldings);
rtacHoldingBulkRepository.bulkUpsert(rtacHoldings);
log.info("Saved items batch for holding: {} offset: {}", holding.getId(), request.getOffset());
} catch (Exception e) {
log.error("Error during bulk upsert of RTAC holdings: {}", e.getMessage(), e);
Expand All @@ -154,10 +194,12 @@ private boolean isInstanceShared(Instance instance) {
return instance.getSource() != null && instance.getSource().contains(CONSORTIUM_SOURCE);
}

private RtacHoldingEntity processIndividualItem(Instance instance, HoldingsRecord holding, Item item, Map<String, Date> dueDateMap, Map<String, Long> holdCountMap) {
private RtacHoldingEntity processIndividualItem(Instance instance, HoldingsRecord holding, Item item,
Map<String, Date> dueDateMap, Map<String, Long> holdCountMap) {
var rtacHolding = rtacHoldingMappingService.mapFrom(holding, item);
rtacHolding.setDueDate(dueDateMap.getOrDefault(rtacHolding.getId(), null));
rtacHolding.setTotalHoldRequests(Math.toIntExact(holdCountMap.getOrDefault(rtacHolding.getId(), 0L)));
rtacHolding.setIsBoundWith(isItemBoundWithHoldings(item, holding));
var entityId = RtacHoldingId.from(rtacHolding);
return new RtacHoldingEntity(entityId, isInstanceShared(instance), rtacHolding, Instant.now());
}
Expand Down Expand Up @@ -191,14 +233,30 @@ private Integer getHoldingsTotalRecords(String instanceId) {
return holdingsResponse.getTotalRecords();
}

private String getItemsByHoldingIdCql(String holdingId) {
return "holdingsRecordId==" + holdingId;
private String getByHoldingsIdCql(String holdingsId) {
return "holdingsRecordId==" + holdingsId;
}

private Integer getItemsTotalRecords(String holdingId) {
var itemsResponse = inventoryClient.getItems(
new FolioCqlRequest(getItemsByHoldingIdCql(holdingId), 0, 0)
new FolioCqlRequest(getByHoldingsIdCql(holdingId), 0, 0)
);
return itemsResponse.getTotalRecords();
}

private int getBoundWithTotal(HoldingsRecord holdings) {
var getByHoldingsIdCql = getByHoldingsIdCql(holdings.getId());
var request = new FolioCqlRequest(getByHoldingsIdCql, 0, 0);
var response = inventoryClient.getBoundWithParts(request);
return response.getTotalRecords();
}

private String buildIdOrCql(List<String> ids) {
return ids.stream().map(id -> "id==" + id).collect(java.util.stream.Collectors.joining(" or "));
}

private boolean isItemBoundWithHoldings(Item item, HoldingsRecord holdings) {
return !StringUtils.equals(item.getHoldingsRecordId(), holdings.getId());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,33 @@ public RtacHolding mapForItemTypeFrom(RtacHolding existingRtacHolding, Item item
return newRtacHolding;
}

public RtacHolding mapForBoundWithItemTypeFrom(RtacHolding holdingsRtacHolding, RtacHolding itemRtacHolding) {
var newRtacHolding = new RtacHolding();
newRtacHolding.setId(itemRtacHolding.getId());
newRtacHolding.setType(TypeEnum.ITEM);
newRtacHolding.setInstanceId(holdingsRtacHolding.getInstanceId());
newRtacHolding.setHoldingsId(holdingsRtacHolding.getHoldingsId());
newRtacHolding.setBarcode(itemRtacHolding.getBarcode());
newRtacHolding.setCallNumber(itemRtacHolding.getCallNumber());
newRtacHolding.setHoldingsCopyNumber(holdingsRtacHolding.getHoldingsCopyNumber());
newRtacHolding.setItemCopyNumber(itemRtacHolding.getItemCopyNumber());
newRtacHolding.setVolume(itemRtacHolding.getVolume());
newRtacHolding.setEffectiveShelvingOrder(itemRtacHolding.getEffectiveShelvingOrder());
newRtacHolding.setStatus(itemRtacHolding.getStatus());
newRtacHolding.setSuppressFromDiscovery(itemRtacHolding.getSuppressFromDiscovery());
newRtacHolding.setLocation(itemRtacHolding.getLocation());
newRtacHolding.setLibrary(itemRtacHolding.getLibrary());
newRtacHolding.setMaterialType(itemRtacHolding.getMaterialType());
newRtacHolding.setTemporaryLoanType(itemRtacHolding.getTemporaryLoanType());
newRtacHolding.setPermanentLoanType(itemRtacHolding.getPermanentLoanType());
newRtacHolding.setHoldingsStatements(holdingsRtacHolding.getHoldingsStatements());
newRtacHolding.setHoldingsStatementsForIndexes(holdingsRtacHolding.getHoldingsStatementsForIndexes());
newRtacHolding.setHoldingsStatementsForSupplements(holdingsRtacHolding.getHoldingsStatementsForSupplements());
newRtacHolding.setNotes(holdingsRtacHolding.getNotes());
newRtacHolding.setIsBoundWith(true);
return newRtacHolding;
}

public RtacHolding mapForPieceTypeFrom(RtacHolding existingRtacHolding, HoldingsRecord holding) {
var newRtacHolding = new RtacHolding();
newRtacHolding.setId(existingRtacHolding.getId());
Expand Down
Loading