Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use async processing to avoid database timeouts #437

Merged
merged 3 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ ext {
springBootVersion = '2.6.6'
springVersion = '6.0.6'
springOauth2Version = "2.5.1.RELEASE"
springDocVersion = '1.6.14'
springDocVersion = '2.2.0'
lombokVersion = '1.18.26'
junit5Version = '5.9.2'
radarSpringAuthVersion = '1.2.0'
Expand Down Expand Up @@ -68,7 +68,7 @@ dependencies {
runtimeOnly("org.hibernate.validator:hibernate-validator:$hibernateValidatorVersion")

// Open API spec
implementation(group: 'org.springdoc', name: 'springdoc-openapi-ui', version: springDocVersion)
implementation(group: 'org.springdoc', name: 'springdoc-openapi-starter-webmvc-ui', version: springDocVersion)


//runtimeOnly('org.springframework.boot:spring-boot-devtools')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,18 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.radarbase.appserver.event.state.dto.DataMessageStateEventDto;
import org.radarbase.appserver.event.state.dto.NotificationStateEventDto;
import org.radarbase.appserver.service.DataMessageStateEventService;
import org.radarbase.appserver.service.NotificationStateEventService;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.event.TransactionalEventListener;

import java.util.Map;

@Slf4j
@Component
Expand All @@ -55,7 +59,9 @@ public MessageStateEventListener(ObjectMapper objectMapper,
*
* @param event the event to respond to
*/
@EventListener(value = NotificationStateEventDto.class)
@Transactional(propagation = Propagation.REQUIRES_NEW)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have had transaction lock issues with postgres (aws rds) when using Propagation.REQUIRES_NEW previously, @mpgxvii can you elaborate the issues we faced previously.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, using TransactionalEventListener + Async was essential, because it only opens a new transaction once the old one is committed. In earlier tests with other permutations, the transactions would overlap and block the HikariCP thread pool.

Copy link
Member

@mpgxvii mpgxvii Sep 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok I see, sounds good. Yes we had to remove the REQUIRES_NEW propagation previously because it was resulting in "Too many connections" issue which was blocking other transactions and causing inconsistent states.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mpgxvii perhaps we can test this with our stage cluster?

@TransactionalEventListener(classes = NotificationStateEventDto.class)
@Async
public void onNotificationStateChange(NotificationStateEventDto event) {
String info = convertMapToString(event.getAdditionalInfo());
log.debug("ID: {}, STATE: {}", event.getNotification().getId(), event.getState());
Expand All @@ -65,7 +71,9 @@ public void onNotificationStateChange(NotificationStateEventDto event) {
notificationStateEventService.addNotificationStateEvent(eventEntity);
}

@EventListener(value = DataMessageStateEventDto.class)
@Transactional(propagation = Propagation.REQUIRES_NEW)
@TransactionalEventListener(value = DataMessageStateEventDto.class)
@Async
public void onDataMessageStateChange(DataMessageStateEventDto event) {
String info = convertMapToString(event.getAdditionalInfo());
log.debug("ID: {}, STATE: {}", event.getDataMessage().getId(), event.getState());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.radarbase.appserver.event.state.dto.NotificationStateEventDto;
import org.radarbase.appserver.event.state.dto.TaskStateEventDto;
import org.radarbase.appserver.service.TaskStateEventService;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.event.TransactionalEventListener;

import java.util.Map;

Expand All @@ -51,7 +56,9 @@ public TaskStateEventListener(ObjectMapper objectMapper,
*
* @param event the event to respond to
*/
@EventListener(value = TaskStateEventDto.class)
@Transactional(propagation = Propagation.REQUIRES_NEW)
@TransactionalEventListener(classes = TaskStateEventDto.class)
@Async
public void onTaskStateChange(TaskStateEventDto event) {
String info = convertMapToString(event.getAdditionalInfo());
log.debug("ID: {}, STATE: {}", event.getTask().getId(), event.getState());
Expand Down
24 changes: 17 additions & 7 deletions src/main/java/org/radarbase/appserver/service/GithubClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,23 @@
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Slf4j
@Component
@Scope(value = ConfigurableBeanFactory.SCOPE_SINGLETON)
public class GithubClient {
private static final String GITHUB_API_URI = "api.github.com";
private static final String GITHUB_API_ACCEPT_HEADER = "application/vnd.github.v3+json";
private final transient HttpClient client;

@Nonnull
private final transient String authorizationHeader;

private transient final Duration httpTimeout;
private transient final Executor executor;

@Value("${security.github.client.maxContentLength:1000000}")
private transient int maxContentLength;
Expand All @@ -64,11 +68,11 @@ public GithubClient(
@Value("${security.github.client.token:}") String githubToken) {
this.authorizationHeader = githubToken != null ? "Bearer " + githubToken.trim() : "";
this.httpTimeout = Duration.ofSeconds(httpTimeout);
this.client = HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_1_1)
.followRedirects(HttpClient.Redirect.NORMAL)
.connectTimeout(this.httpTimeout)
.build();
this.executor = new ThreadPoolExecutor(0,
8,
30,
TimeUnit.SECONDS,
new SynchronousQueue<>());
}

public String getGithubContent(String url) throws IOException, InterruptedException {
Expand All @@ -94,10 +98,16 @@ public String getGithubContent(String url) throws IOException, InterruptedExcept
}

private HttpResponse<InputStream> makeRequest(URI uri) throws InterruptedException {
HttpClient client = HttpClient.newBuilder()
.executor(executor)
.version(HttpClient.Version.HTTP_1_1)
.followRedirects(HttpClient.Redirect.NORMAL)
.connectTimeout(this.httpTimeout)
.build();
try {
return client.send(getRequest(uri), HttpResponse.BodyHandlers.ofInputStream());
} catch (IOException ex) {
log.error("Failed to retrieve data from github: {}", ex.toString());
log.error("Failed to retrieve data from github {}: {}", uri, ex.toString());
throw new ResponseStatusException(HttpStatus.BAD_GATEWAY, "Github responded with an error.");
}
}
Expand Down
20 changes: 11 additions & 9 deletions src/main/java/org/radarbase/appserver/service/TaskService.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,16 @@ private void addTaskStateEvent(Task t, TaskState state, Instant time) {
@Transactional
public Task updateTaskStatus(Task oldTask, TaskState state) {
User user = oldTask.getUser();
if (this.taskRepository.existsByUserIdAndNameAndTimestamp(user.getId(), oldTask.getName(), oldTask.getTimestamp())) {
if (state.equals(TaskState.COMPLETED)) {
oldTask.setCompleted(true);
oldTask.setTimeCompleted(Timestamp.from(Instant.now()));
}
oldTask.setStatus(state);
return this.taskRepository.saveAndFlush(oldTask);
} else throw new NotFoundException(
"The Task does not exists. Please Use add endpoint");

if (!this.taskRepository.existsByUserIdAndNameAndTimestamp(user.getId(), oldTask.getName(), oldTask.getTimestamp())) {
throw new NotFoundException("The Task " + oldTask.getId() + " does not exist to set to state " + state + ". Please Use add endpoint");
}

if (state.equals(TaskState.COMPLETED)) {
oldTask.setCompleted(true);
oldTask.setTimeCompleted(Timestamp.from(Instant.now()));
}
oldTask.setStatus(state);
return this.taskRepository.saveAndFlush(oldTask);
}
}
2 changes: 1 addition & 1 deletion src/main/resources/application-dev.properties
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ security.radar.managementportal.url=http://localhost:8081
#security.oauth2.client.userAuthorizationUri=
# Github Authentication
security.github.client.token=
security.github.client.timeout=PT10s
security.github.client.timeout=10
# max content size 1 MB
security.github.client.maxContentLength=1000000
security.github.cache.size=10000
Expand Down
Loading