Skip to content

Commit

Permalink
feat(reindex) Implement endpoint to run merge reindex stage only for …
Browse files Browse the repository at this point in the history
…failed ranges (#721)

* feat(reindex): Extend range tables with the status column

- add status, failCause columns to merge/upload range tables
- implement status population logic

Closes: MSEARCH-870
  • Loading branch information
viacheslavkol authored Jan 7, 2025
1 parent 3e306ed commit e0195a4
Show file tree
Hide file tree
Showing 19 changed files with 256 additions and 26 deletions.
3 changes: 2 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
* Description ([ISSUE](https://folio-org.atlassian.net/browse/ISSUE))

### New APIs versions
* Provides `API_NAME vX.Y`
* Provides `indices v1.1`
* Requires `API_NAME vX.Y`

### Features
* Move Instance sub-entities population from database trigger to code ([MSEARCH-887](https://folio-org.atlassian.net/browse/MSEARCH-887))
* Update reindex merge failed status only for failed entity type ([MSEARCH-909](https://folio-org.atlassian.net/browse/MSEARCH-909))
* Extend reindex range tables with status, fail_cause columns ([MSEARCH-870](https://folio-org.atlassian.net/browse/MSEARCH-870))
* Implement scheduled indexing for instance sub-resources ([MSEARCH-922](https://folio-org.atlassian.net/browse/MSEARCH-922))
* Implement endpoint to run merge reindex stage only for failed ranges ([MSEARCH-906](https://folio-org.atlassian.net/browse/MSEARCH-906))

### Bug fixes
* Remove shelving order calculation for local call-number types
Expand Down
22 changes: 20 additions & 2 deletions descriptors/ModuleDescriptor-template.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"provides": [
{
"id": "indices",
"version": "1.0",
"version": "1.1",
"handlers": [
{
"methods": [
Expand Down Expand Up @@ -111,6 +111,18 @@
"modulePermissions": [
"user-tenants.collection.get"
]
},
{
"methods": [
"POST"
],
"pathPattern": "/search/index/instance-records/reindex/merge/failed",
"permissionsRequired": [
"search.index.instance-records.reindex.merge.failed.post"
],
"modulePermissions": [
"user-tenants.collection.get"
]
}
]
},
Expand Down Expand Up @@ -696,6 +708,11 @@
"displayName": "Search - starts inventory instance records reindex upload operation",
"description": "Starts inventory instance records reindex upload operation"
},
{
"permissionName": "search.index.instance-records.reindex.merge.failed.post",
"displayName": "Search - starts inventory instance records reindex merge failed operation",
"description": "Starts inventory instance records reindexing for failed merge ranges"
},
{
"permissionName": "search.instances.collection.get",
"displayName": "Search - searches instances by given query",
Expand Down Expand Up @@ -896,7 +913,8 @@
"subPermissions": [
"search.index.instance-records.reindex.status.get",
"search.index.instance-records.reindex.full.post",
"search.index.instance-records.reindex.upload.post"
"search.index.instance-records.reindex.upload.post",
"search.index.instance-records.reindex.failed.post"
]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ public ResponseEntity<Void> reindexUploadInstanceRecords(String tenantId, Reinde
return ResponseEntity.ok().build();
}

@Override
public ResponseEntity<Void> reindexFailedMergeRanges(String tenantId) {
reindexService.submitFailedMergeRangesReindex(tenantId);
return ResponseEntity.ok().build();
}

@Override
public ResponseEntity<ReindexJob> reindexInventoryRecords(String tenantId, ReindexRequest request) {
log.info("Attempting to start reindex for inventory [tenant: {}]", tenantId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ public List<MergeRangeEntity> fetchMergeRanges(ReindexEntityType entityType) {
return repositories.get(entityType).getMergeRanges();
}

public List<MergeRangeEntity> fetchFailedMergeRanges() {
return repositories.values().iterator().next().getFailedMergeRanges();
}

public void updateStatus(ReindexEntityType entityType, String rangeId, ReindexRangeStatus status, String failCause) {
var repository = repositories.get(entityType);
repository.updateRangeStatus(UUID.fromString(rangeId), Timestamp.from(Instant.now()), status, failCause);
Expand Down
39 changes: 35 additions & 4 deletions src/main/java/org/folio/search/service/reindex/ReindexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.collections4.CollectionUtils;
Expand Down Expand Up @@ -66,7 +67,7 @@ public ReindexService(ConsortiumTenantService consortiumService,
public CompletableFuture<Void> submitFullReindex(String tenantId, IndexSettings indexSettings) {
log.info("submitFullReindex:: for [tenantId: {}]", tenantId);

validateTenant(tenantId);
validateTenant("submitFullReindex", tenantId);

reindexCommonService.deleteAllRecords();
statusService.recreateMergeStatusRecords();
Expand Down Expand Up @@ -140,6 +141,36 @@ private CompletableFuture<Void> submitUploadReindex(String tenantId,
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}

public CompletableFuture<Void> submitFailedMergeRangesReindex(String tenantId) {
log.info("submitFailedMergeRangesReindex:: for [tenantId: {}]", tenantId);

validateTenant("submitFailedMergeRangesReindex", tenantId);

var failedRanges = mergeRangeService.fetchFailedMergeRanges();
if (CollectionUtils.isEmpty(failedRanges)) {
log.info("submitFailedMergeRangesReindex:: no failed ranges found");
return CompletableFuture.completedFuture(null);
}

log.info("submitFailedMergeRangesReindex:: for [tenantId: {}, count: {}]", tenantId, failedRanges.size());
var entityTypes = failedRanges.stream()
.map(MergeRangeEntity::getEntityType)
.collect(Collectors.toSet());
statusService.updateReindexMergeInProgress(entityTypes);

var futures = new ArrayList<>();
for (var rangeEntity : failedRanges) {
var future = CompletableFuture.runAsync(() ->
executionService.executeSystemUserScoped(rangeEntity.getTenantId(), () -> {
inventoryService.publishReindexRecordsRange(rangeEntity);
return null;
}), reindexPublisherExecutor);
futures.add(future);
}

return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}

private void recreateIndices(String tenantId, List<ReindexEntityType> entityTypes, IndexSettings indexSettings) {
for (var reindexEntityType : entityTypes) {
reindexCommonService.recreateIndex(reindexEntityType, tenantId, indexSettings);
Expand Down Expand Up @@ -180,7 +211,7 @@ private void publishRecordsRange(String tenantId) {
}

private void validateUploadReindex(String tenantId, List<ReindexEntityType> entityTypes) {
validateTenant(tenantId);
validateTenant("submitUploadReindex", tenantId);

var statusesByType = statusService.getStatusesByType();

Expand All @@ -207,10 +238,10 @@ private void validateUploadReindex(String tenantId, List<ReindexEntityType> enti
}
}

private void validateTenant(String tenantId) {
private void validateTenant(String operation, String tenantId) {
var central = consortiumService.getCentralTenant(tenantId);
if (central.isPresent() && !central.get().equals(tenantId)) {
log.info("initFullReindex:: could not be started for consortium member tenant [tenantId: {}]", tenantId);
log.info("{}:: could not be started for consortium member tenant [tenantId: {}]", operation, tenantId);
throw RequestValidationException.memberTenantNotAllowedException(tenantId);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.log4j.Log4j2;
import org.folio.search.converter.ReindexStatusMapper;
Expand Down Expand Up @@ -93,6 +94,11 @@ public void updateReindexMergeStarted(ReindexEntityType entityType, int totalMer
statusRepository.setMergeReindexStarted(entityType, totalMergeRanges);
}

public void updateReindexMergeInProgress(Set<ReindexEntityType> entityTypes) {
log.info("updateReindexMergeInProgress:: for [entityTypes: {}]", entityTypes);
statusRepository.setMergeInProgress(entityTypes);
}

public void updateReindexUploadStarted(ReindexEntityType entityType, int totalUploadRanges) {
log.info("updateReindexUploadStarted:: for [entityType: {}, totalMergeRanges: {}]", entityType, totalUploadRanges);
statusRepository.setUploadReindexStarted(entityType, totalUploadRanges);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public abstract class MergeRangeRepository extends ReindexJdbcRepository {

private static final String SELECT_MERGE_RANGES_BY_ENTITY_TYPE = "SELECT * FROM %s WHERE entity_type = ?;";

private static final String SELECT_FAILED_MERGE_RANGES = "SELECT * FROM %s WHERE status = 'FAIL';";

protected MergeRangeRepository(JdbcTemplate jdbcTemplate,
JsonConverter jsonConverter,
FolioExecutionContext context) {
Expand Down Expand Up @@ -62,6 +64,12 @@ public List<MergeRangeEntity> getMergeRanges() {
return jdbcTemplate.query(sql, mergeRangeEntityRowMapper(), entityType().getType());
}

public List<MergeRangeEntity> getFailedMergeRanges() {
var fullTableName = getFullTableName(context, MERGE_RANGE_TABLE);
var sql = SELECT_FAILED_MERGE_RANGES.formatted(fullTableName);
return jdbcTemplate.query(sql, mergeRangeEntityRowMapper());
}

public void truncateMergeRanges() {
JdbcUtils.truncateTable(MERGE_RANGE_TABLE, jdbcTemplate, context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import org.folio.search.model.reindex.ReindexStatusEntity;
Expand Down Expand Up @@ -124,6 +125,16 @@ public void setMergeReindexFailed(List<ReindexEntityType> entityTypes) {
jdbcTemplate.update(sql, ReindexStatus.MERGE_FAILED.name(), Timestamp.from(Instant.now()));
}

public void setMergeInProgress(Set<ReindexEntityType> entityTypes) {
var inTypes = entityTypes.stream()
.map(entityType -> "'%s'".formatted(entityType.name()))
.collect(Collectors.joining(","));
var fullTableName = getFullTableName(context, REINDEX_STATUS_TABLE);
var sql = UPDATE_FOR_ENTITIES_SQL.formatted(fullTableName, STATUS_COLUMN + " = ?", inTypes);

jdbcTemplate.update(sql, ReindexStatus.MERGE_IN_PROGRESS.name());
}

public void saveReindexStatusRecords(List<ReindexStatusEntity> statusRecords) {
var fullTableName = getFullTableName(context, REINDEX_STATUS_TABLE);
jdbcTemplate.batchUpdate(INSERT_REINDEX_STATUS_SQL.formatted(fullTableName), statusRecords, 10,
Expand Down
3 changes: 3 additions & 0 deletions src/main/resources/swagger.api/mod-search.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ paths:
/search/index/instance-records/reindex/upload:
$ref: 'paths/reindex-instance-records/reindex-instance-records-upload.yaml'

/search/index/instance-records/reindex/merge/failed:
$ref: 'paths/reindex-instance-records/reindex-instance-records-merge-failed.yaml'

/search/config/languages:
$ref: 'paths/search-config/search-config-languages.yaml'

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
post:
operationId: reindexFailedMergeRanges
summary: Failed Merge Ranges Re-Index
description: Initiates reindexing of failed merge ranges for inventory instance records
tags:
- index-management
parameters:
- $ref: '../../parameters/x-okapi-tenant-header.yaml'
responses:
'200':
description: Reindexing of failed merge ranges has been started
'400':
$ref: '../../responses/badRequestResponse.yaml'
'500':
$ref: '../../responses/internalServerErrorResponse.yaml'
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.folio.search.controller;

import static org.folio.search.support.base.ApiEndpoints.createIndicesPath;
import static org.folio.search.support.base.ApiEndpoints.reindexFailedPath;
import static org.folio.search.support.base.ApiEndpoints.reindexFullPath;
import static org.folio.search.support.base.ApiEndpoints.reindexInstanceRecordsStatus;
import static org.folio.search.support.base.ApiEndpoints.reindexUploadPath;
Expand Down Expand Up @@ -55,8 +56,8 @@
import org.opensearch.core.index.Index;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.Import;
import org.springframework.test.context.bean.override.mockito.MockitoBean;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.request.MockHttpServletRequestBuilder;

Expand All @@ -71,13 +72,13 @@ class IndexManagementControllerTest {

@Autowired
private MockMvc mockMvc;
@MockBean
@MockitoBean
private IndexService indexService;
@MockBean
@MockitoBean
private ResourceService resourceService;
@MockBean
@MockitoBean
private ReindexService reindexService;
@MockBean
@MockitoBean
private ReindexStatusService reindexStatusService;

@Test
Expand Down Expand Up @@ -121,6 +122,15 @@ void submitReindexUpload_positive_withSettings() throws Exception {
.andExpect(status().isOk());
}

@Test
void submitReindexMergeFailed_positive() throws Exception {
when(reindexService.submitFailedMergeRangesReindex(TENANT_ID)).thenReturn(new CompletableFuture<>());

mockMvc.perform(post(reindexFailedPath())
.header(XOkapiHeaders.TENANT, TENANT_ID))
.andExpect(status().isOk());
}

@Test
void createIndex_positive() throws Exception {
when(indexService.createIndex(RESOURCE, TENANT_ID))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,13 @@ void saveEntities(ReindexRecordsEvent.ReindexRecordType recordType) {
verifyNoInteractions(instanceChildrenResourceService);
}
}

@Test
void fetchFailedMergeRanges() {
// act
service.fetchFailedMergeRanges();

// assert
verify(repositoryMap.values().iterator().next()).getFailedMergeRanges();
}
}
Loading

0 comments on commit e0195a4

Please sign in to comment.