diff --git a/.gitignore b/.gitignore index 3420b922..9c5d913e 100644 --- a/.gitignore +++ b/.gitignore @@ -37,3 +37,5 @@ out/ .vscode/ src/main/resources/.env + +docker-compose.yml \ No newline at end of file diff --git a/src/main/java/com/gachtaxi/domain/matching/algorithm/service/MockMatchingAlgorithmService.java b/src/main/java/com/gachtaxi/domain/matching/algorithm/service/MockMatchingAlgorithmService.java index 31ea0762..fe90a21a 100644 --- a/src/main/java/com/gachtaxi/domain/matching/algorithm/service/MockMatchingAlgorithmService.java +++ b/src/main/java/com/gachtaxi/domain/matching/algorithm/service/MockMatchingAlgorithmService.java @@ -50,7 +50,7 @@ public Optional findRoom(Long userId, double startLongitude, dou ACTIVE 상태인 방만 필터링 */ matchingRooms = matchingRooms.stream() - .filter(MatchingRoom::isActiveMatchingRoom) + .filter(MatchingRoom::isActive) .toList(); /* 태그 조건이 있는 경우에 태그정보까지 필터링 diff --git a/src/main/java/com/gachtaxi/domain/matching/aop/SseSubscribeRequired.java b/src/main/java/com/gachtaxi/domain/matching/aop/SseSubscribeRequired.java new file mode 100644 index 00000000..a75b4a1f --- /dev/null +++ b/src/main/java/com/gachtaxi/domain/matching/aop/SseSubscribeRequired.java @@ -0,0 +1,12 @@ +package com.gachtaxi.domain.matching.aop; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface SseSubscribeRequired { + +} diff --git a/src/main/java/com/gachtaxi/domain/matching/aop/SseSubscribeRequiredAop.java b/src/main/java/com/gachtaxi/domain/matching/aop/SseSubscribeRequiredAop.java new file mode 100644 index 00000000..fce23e7d --- /dev/null +++ b/src/main/java/com/gachtaxi/domain/matching/aop/SseSubscribeRequiredAop.java @@ -0,0 +1,52 @@ +package com.gachtaxi.domain.matching.aop; + +import com.gachtaxi.domain.matching.common.controller.ResponseMessage; +import com.gachtaxi.domain.matching.common.exception.ControllerNotHasCurrentMemberIdException; +import com.gachtaxi.domain.matching.common.service.AutoMatchingService; +import com.gachtaxi.global.auth.jwt.annotation.CurrentMemberId; +import com.gachtaxi.global.common.response.ApiResponse; +import java.lang.reflect.Parameter; +import lombok.RequiredArgsConstructor; +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.annotation.Around; +import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.reflect.MethodSignature; +import org.springframework.http.HttpStatus; +import org.springframework.stereotype.Component; + +@Aspect +@Component +@RequiredArgsConstructor +public class SseSubscribeRequiredAop { + + private final AutoMatchingService autoMatchingService; + + @Around("@annotation(com.gachtaxi.domain.matching.aop.SseSubscribeRequired)") + public Object checkSseSubscribe(ProceedingJoinPoint proceedingJoinPoint) throws Throwable{ + Long memberId = null; + MethodSignature signature = (MethodSignature) proceedingJoinPoint.getSignature(); + Parameter[] parameters = signature.getMethod().getParameters(); + + for (int i = 0; i < parameters.length; i++) { + Parameter parameter = parameters[i]; + if (parameter.getType().equals(Long.class) && parameter.isAnnotationPresent( + CurrentMemberId.class)) { + memberId = (Long) proceedingJoinPoint.getArgs()[i]; + break; + } + } + + if (memberId == null) { + throw new ControllerNotHasCurrentMemberIdException(); + } + + if (!this.autoMatchingService.isSseSubscribed(memberId)) { + return ApiResponse.response( + HttpStatus.BAD_REQUEST, + ResponseMessage.NOT_SUBSCRIBED_SSE.getMessage() + ); + } + + return proceedingJoinPoint.proceed(); + } +} diff --git a/src/main/java/com/gachtaxi/domain/matching/common/controller/AutoMatchingController.java b/src/main/java/com/gachtaxi/domain/matching/common/controller/AutoMatchingController.java index cd32b3cd..95c2296e 100644 --- a/src/main/java/com/gachtaxi/domain/matching/common/controller/AutoMatchingController.java +++ b/src/main/java/com/gachtaxi/domain/matching/common/controller/AutoMatchingController.java @@ -1,10 +1,14 @@ package com.gachtaxi.domain.matching.common.controller; +import com.gachtaxi.domain.matching.aop.SseSubscribeRequired; +import com.gachtaxi.domain.matching.common.dto.request.AutoMatchingCancelledRequest; import com.gachtaxi.domain.matching.common.dto.request.AutoMatchingPostRequest; import com.gachtaxi.domain.matching.common.dto.response.AutoMatchingPostResponse; import com.gachtaxi.domain.matching.common.service.AutoMatchingService; +import com.gachtaxi.global.auth.jwt.annotation.CurrentMemberId; import com.gachtaxi.global.common.response.ApiResponse; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.GetMapping; @@ -15,6 +19,7 @@ import org.springframework.web.bind.annotation.RestController; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; +@Slf4j @RestController @RequiredArgsConstructor @RequestMapping("/api/matching/auto") @@ -23,31 +28,33 @@ public class AutoMatchingController { private final AutoMatchingService autoMatchingService; @GetMapping(value = "/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE) - public SseEmitter subscribeSse(@RequestParam Long memberId) { - // TODO: 인가 로직 완성되면 해당 멤버의 아이디를 가져오도록 변경 -// Long memberId = 1L; - + public SseEmitter subscribeSse(@CurrentMemberId Long memberId) { return this.autoMatchingService.handleSubscribe(memberId); } @PostMapping("/request") + @SseSubscribeRequired public ApiResponse requestMatching( - @RequestParam Long memberId, + @CurrentMemberId Long memberId, @RequestBody AutoMatchingPostRequest autoMatchingPostRequest ) { - // TODO: 인가 로직 완성되면 해당 멤버의 아이디를 가져오도록 변경 -// Long memberId = 1L; - if (!this.autoMatchingService.isSseSubscribed(memberId)) { - return ApiResponse.response( - HttpStatus.BAD_REQUEST, - ResponseMessage.NOT_SUBSCRIBED_SSE.getMessage() - ); - } - return ApiResponse.response( HttpStatus.OK, ResponseMessage.AUTO_MATCHING_REQUEST_ACCEPTED.getMessage(), this.autoMatchingService.handlerAutoRequestMatching(memberId, autoMatchingPostRequest) ); } + + @PostMapping("/cancel") + @SseSubscribeRequired + public ApiResponse cancelMatching( + @CurrentMemberId Long memberId, + @RequestBody AutoMatchingCancelledRequest autoMatchingCancelledRequest + ) { + return ApiResponse.response( + HttpStatus.OK, + ResponseMessage.AUTO_MATCHING_REQUEST_CANCELLED.getMessage(), + this.autoMatchingService.handlerAutoCancelMatching(memberId, autoMatchingCancelledRequest) + ); + } } \ No newline at end of file diff --git a/src/main/java/com/gachtaxi/domain/matching/common/controller/ResponseMessage.java b/src/main/java/com/gachtaxi/domain/matching/common/controller/ResponseMessage.java index 63dcff16..859df93d 100644 --- a/src/main/java/com/gachtaxi/domain/matching/common/controller/ResponseMessage.java +++ b/src/main/java/com/gachtaxi/domain/matching/common/controller/ResponseMessage.java @@ -12,7 +12,8 @@ public enum ResponseMessage { // auto matching AUTO_MATCHING_REQUEST_ACCEPTED("자동 매칭 요청 전송에 성공했습니다."), - NOT_SUBSCRIBED_SSE("SSE 구독 후 자동 매칭을 요청할 수 있습니다."); + NOT_SUBSCRIBED_SSE("SSE 구독 후 자동 매칭을 요청할 수 있습니다."), + AUTO_MATCHING_REQUEST_CANCELLED("자동 매칭 취소 요청 전송에 성공했습니다."); private final String message; } diff --git a/src/main/java/com/gachtaxi/domain/matching/common/dto/enums/AutoMatchingStatus.java b/src/main/java/com/gachtaxi/domain/matching/common/dto/enums/AutoMatchingStatus.java index c1dee135..03401cad 100644 --- a/src/main/java/com/gachtaxi/domain/matching/common/dto/enums/AutoMatchingStatus.java +++ b/src/main/java/com/gachtaxi/domain/matching/common/dto/enums/AutoMatchingStatus.java @@ -7,7 +7,7 @@ @AllArgsConstructor public enum AutoMatchingStatus { REQUESTED("REQUESTED"), - REJECTED("REJECTED"); - + REJECTED("REJECTED"), + CANCELLED("CANCELLED"); private final String value; } diff --git a/src/main/java/com/gachtaxi/domain/matching/common/dto/request/AutoMatchingCancelledRequest.java b/src/main/java/com/gachtaxi/domain/matching/common/dto/request/AutoMatchingCancelledRequest.java new file mode 100644 index 00000000..bd0c66fe --- /dev/null +++ b/src/main/java/com/gachtaxi/domain/matching/common/dto/request/AutoMatchingCancelledRequest.java @@ -0,0 +1,7 @@ +package com.gachtaxi.domain.matching.common.dto.request; + +public record AutoMatchingCancelledRequest( + Long roomId +) { + +} diff --git a/src/main/java/com/gachtaxi/domain/matching/common/entity/MatchingRoom.java b/src/main/java/com/gachtaxi/domain/matching/common/entity/MatchingRoom.java index e625fd1e..0afef65e 100644 --- a/src/main/java/com/gachtaxi/domain/matching/common/entity/MatchingRoom.java +++ b/src/main/java/com/gachtaxi/domain/matching/common/entity/MatchingRoom.java @@ -2,6 +2,7 @@ import com.gachtaxi.domain.matching.algorithm.dto.FindRoomResult; import com.gachtaxi.domain.matching.common.entity.enums.MatchingRoomStatus; +import com.gachtaxi.domain.matching.event.dto.kafka_topic.MatchRoomCreatedEvent; import com.gachtaxi.domain.matching.common.entity.enums.Tags; import com.gachtaxi.domain.members.entity.Members; import com.gachtaxi.global.common.entity.BaseEntity; @@ -20,10 +21,11 @@ import lombok.Builder; import lombok.Getter; import lombok.NoArgsConstructor; +import lombok.Setter; @Entity @Table(name = "matching_room") -@Builder +@Builder(access = AccessLevel.PRIVATE) @NoArgsConstructor(access = AccessLevel.PROTECTED) @AllArgsConstructor(access = AccessLevel.PRIVATE) public class MatchingRoom extends BaseEntity { @@ -40,6 +42,8 @@ public class MatchingRoom extends BaseEntity { private List memberMatchingRoomChargingInfo; @ManyToOne(cascade = CascadeType.PERSIST, optional = false) + @Getter + @Setter private Members roomMaster; @Column(name = "title", nullable = false) @@ -58,9 +62,37 @@ public class MatchingRoom extends BaseEntity { @Enumerated(EnumType.STRING) private MatchingRoomStatus matchingRoomStatus; - public boolean isActiveMatchingRoom() { + public boolean isActive() { return this.matchingRoomStatus == MatchingRoomStatus.ACTIVE; } + + public void changeRoomMaster(Members members) { + this.setRoomMaster(members); + } + + public void cancelMatchingRoom() { + this.matchingRoomStatus = MatchingRoomStatus.CANCELLED; + } + + public void completeMatchingRoom() { + this.matchingRoomStatus = MatchingRoomStatus.COMPLETE; + } + + public boolean isFull(int size) { + return size == totalCharge; + } + + public static MatchingRoom activeOf(MatchRoomCreatedEvent matchRoomCreatedEvent, Members members, Route route) { + return MatchingRoom.builder() + .capacity(matchRoomCreatedEvent.maxCapacity()) + .roomMaster(members) + .title(matchRoomCreatedEvent.title()) + .description(matchRoomCreatedEvent.description()) + .route(route) + .totalCharge(matchRoomCreatedEvent.expectedTotalCharge()) + .matchingRoomStatus(MatchingRoomStatus.ACTIVE) + .build(); + } public boolean containsTag(Tags tag) { return this.matchingRoomTagInfo.stream() .anyMatch(tagInfo -> tagInfo.matchesTag(tag)); diff --git a/src/main/java/com/gachtaxi/domain/matching/common/entity/MatchingRoomTagInfo.java b/src/main/java/com/gachtaxi/domain/matching/common/entity/MatchingRoomTagInfo.java index 8bbad8a9..9e85496c 100644 --- a/src/main/java/com/gachtaxi/domain/matching/common/entity/MatchingRoomTagInfo.java +++ b/src/main/java/com/gachtaxi/domain/matching/common/entity/MatchingRoomTagInfo.java @@ -16,7 +16,7 @@ @Table(name = "matching_room_tag_info") @NoArgsConstructor(access = AccessLevel.PROTECTED) @AllArgsConstructor(access = AccessLevel.PRIVATE) -@Builder +@Builder(access = AccessLevel.PRIVATE) public class MatchingRoomTagInfo extends BaseEntity { @ManyToOne @@ -25,6 +25,13 @@ public class MatchingRoomTagInfo extends BaseEntity { @Enumerated(EnumType.STRING) private Tags tags; + public static MatchingRoomTagInfo of(MatchingRoom matchingRoom, Tags tag) { + return MatchingRoomTagInfo.builder() + .matchingRoom(matchingRoom) + .tags(tag) + .build(); + } + public boolean matchesTag(Tags tag) { return this.tags == tag; } diff --git a/src/main/java/com/gachtaxi/domain/matching/common/entity/MemberMatchingRoomChargingInfo.java b/src/main/java/com/gachtaxi/domain/matching/common/entity/MemberMatchingRoomChargingInfo.java index aa4e11b3..ec1a7e59 100644 --- a/src/main/java/com/gachtaxi/domain/matching/common/entity/MemberMatchingRoomChargingInfo.java +++ b/src/main/java/com/gachtaxi/domain/matching/common/entity/MemberMatchingRoomChargingInfo.java @@ -14,6 +14,7 @@ import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Builder; +import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; @@ -26,10 +27,11 @@ ) @NoArgsConstructor(access = AccessLevel.PROTECTED) @AllArgsConstructor(access = AccessLevel.PRIVATE) -@Builder +@Builder(access = AccessLevel.PRIVATE) public class MemberMatchingRoomChargingInfo extends BaseEntity { @ManyToOne(fetch = FetchType.LAZY) + @Getter private Members members; @ManyToOne(fetch = FetchType.LAZY) @@ -40,5 +42,27 @@ public class MemberMatchingRoomChargingInfo extends BaseEntity { private Integer charge; @Enumerated(EnumType.STRING) - private PaymentStatus paymentStatus = PaymentStatus.NOT_PAYED; + private PaymentStatus paymentStatus; + + public void leftMatchingRoom() { + this.paymentStatus = PaymentStatus.LEFT; + } + + public boolean isAlreadyLeft() { + return this.paymentStatus == PaymentStatus.LEFT; + } + + public MemberMatchingRoomChargingInfo joinMatchingRoom() { + this.paymentStatus = PaymentStatus.NOT_PAYED; + return this; + } + + public static MemberMatchingRoomChargingInfo notPayedOf(MatchingRoom matchingRoom, Members members) { + return MemberMatchingRoomChargingInfo.builder() + .matchingRoom(matchingRoom) + .members(members) + .charge(matchingRoom.getTotalCharge()) + .paymentStatus(PaymentStatus.NOT_PAYED) + .build(); + } } diff --git a/src/main/java/com/gachtaxi/domain/matching/common/entity/Route.java b/src/main/java/com/gachtaxi/domain/matching/common/entity/Route.java index 1a942627..2b7f5260 100644 --- a/src/main/java/com/gachtaxi/domain/matching/common/entity/Route.java +++ b/src/main/java/com/gachtaxi/domain/matching/common/entity/Route.java @@ -1,5 +1,6 @@ package com.gachtaxi.domain.matching.common.entity; +import com.gachtaxi.domain.matching.event.dto.kafka_topic.MatchRoomCreatedEvent; import com.gachtaxi.global.common.entity.BaseEntity; import jakarta.persistence.Entity; import jakarta.persistence.Table; diff --git a/src/main/java/com/gachtaxi/domain/matching/common/entity/enums/MatchingRoomStatus.java b/src/main/java/com/gachtaxi/domain/matching/common/entity/enums/MatchingRoomStatus.java index ddfcd386..601abd7b 100644 --- a/src/main/java/com/gachtaxi/domain/matching/common/entity/enums/MatchingRoomStatus.java +++ b/src/main/java/com/gachtaxi/domain/matching/common/entity/enums/MatchingRoomStatus.java @@ -1,5 +1,5 @@ package com.gachtaxi.domain.matching.common.entity.enums; public enum MatchingRoomStatus { - PROCESS, COMPLETE, CANCELED, ACTIVE + COMPLETE, CANCELLED, ACTIVE } \ No newline at end of file diff --git a/src/main/java/com/gachtaxi/domain/matching/common/entity/enums/PaymentStatus.java b/src/main/java/com/gachtaxi/domain/matching/common/entity/enums/PaymentStatus.java index c754bfb3..bc0c4be4 100644 --- a/src/main/java/com/gachtaxi/domain/matching/common/entity/enums/PaymentStatus.java +++ b/src/main/java/com/gachtaxi/domain/matching/common/entity/enums/PaymentStatus.java @@ -1,5 +1,5 @@ package com.gachtaxi.domain.matching.common.entity.enums; public enum PaymentStatus { - PAYED, NOT_PAYED, FAILED + PAYED, NOT_PAYED, FAILED, LEFT } diff --git a/src/main/java/com/gachtaxi/domain/matching/common/exception/ControllerNotHasCurrentMemberIdException.java b/src/main/java/com/gachtaxi/domain/matching/common/exception/ControllerNotHasCurrentMemberIdException.java new file mode 100644 index 00000000..3e24768f --- /dev/null +++ b/src/main/java/com/gachtaxi/domain/matching/common/exception/ControllerNotHasCurrentMemberIdException.java @@ -0,0 +1,13 @@ +package com.gachtaxi.domain.matching.common.exception; + +import static com.gachtaxi.domain.matching.common.exception.ErrorMessage.CONTROLLER_NOT_HAS_CURRENT_MEMBER_ID; +import static org.springframework.http.HttpStatus.INTERNAL_SERVER_ERROR; + +import com.gachtaxi.global.common.exception.BaseException; + +public class ControllerNotHasCurrentMemberIdException extends BaseException { + + public ControllerNotHasCurrentMemberIdException() { + super(INTERNAL_SERVER_ERROR, CONTROLLER_NOT_HAS_CURRENT_MEMBER_ID.getMessage()); + } +} diff --git a/src/main/java/com/gachtaxi/domain/matching/common/exception/ErrorMessage.java b/src/main/java/com/gachtaxi/domain/matching/common/exception/ErrorMessage.java index b51b80b8..6f8cd58b 100644 --- a/src/main/java/com/gachtaxi/domain/matching/common/exception/ErrorMessage.java +++ b/src/main/java/com/gachtaxi/domain/matching/common/exception/ErrorMessage.java @@ -9,6 +9,11 @@ public enum ErrorMessage { NO_SUCH_MATCHING_ROOM("해당 매칭 방이 존재하지 않습니다."), NOT_ACTIVE_MATCHING_ROOM("열린 매칭 방이 아닙니다."), + MEMBER_NOT_IN_MATCHING_ROOM("해당 매칭 방에 참가한 멤버가 아닙니다."), + MEMBER_ALREADY_JOINED_MATCHING_ROOM("해당 맴버는 이미 매칭 방에 참가한 멤버입니다"), + MEMBER_ALREADY_LEFT_MATCHING_ROOM("해당 멤버는 이미 매칭 방에서 나간 멤버입니다."), + CONTROLLER_NOT_HAS_CURRENT_MEMBER_ID("해당 컨트롤러는 인가된 멤버 ID가 필요합니다."), + NOT_DEFINED_KAFKA_TEMPLATE("해당 이벤트와 맞는 KafkaTemplate이 정의되지 않았습니다."), DUPLICATED_MATCHING_ROOM("이미 존재하는 매칭 방입니다."), NOT_FOUND_PAGE("페이지 번호는 0 이상이어야 합니다."); diff --git a/src/main/java/com/gachtaxi/domain/matching/common/exception/MemberAlreadyJoinedException.java b/src/main/java/com/gachtaxi/domain/matching/common/exception/MemberAlreadyJoinedException.java new file mode 100644 index 00000000..ad935d43 --- /dev/null +++ b/src/main/java/com/gachtaxi/domain/matching/common/exception/MemberAlreadyJoinedException.java @@ -0,0 +1,13 @@ +package com.gachtaxi.domain.matching.common.exception; + +import static com.gachtaxi.domain.matching.common.exception.ErrorMessage.MEMBER_ALREADY_JOINED_MATCHING_ROOM; +import static org.springframework.http.HttpStatus.CONFLICT; + +import com.gachtaxi.global.common.exception.BaseException; + +public class MemberAlreadyJoinedException extends BaseException { + + public MemberAlreadyJoinedException() { + super(CONFLICT, MEMBER_ALREADY_JOINED_MATCHING_ROOM.getMessage()); + } +} diff --git a/src/main/java/com/gachtaxi/domain/matching/common/exception/MemberAlreadyLeftMatchingRoomException.java b/src/main/java/com/gachtaxi/domain/matching/common/exception/MemberAlreadyLeftMatchingRoomException.java new file mode 100644 index 00000000..344789ea --- /dev/null +++ b/src/main/java/com/gachtaxi/domain/matching/common/exception/MemberAlreadyLeftMatchingRoomException.java @@ -0,0 +1,13 @@ +package com.gachtaxi.domain.matching.common.exception; + +import static com.gachtaxi.domain.matching.common.exception.ErrorMessage.MEMBER_ALREADY_LEFT_MATCHING_ROOM; +import static org.springframework.http.HttpStatus.CONFLICT; + +import com.gachtaxi.global.common.exception.BaseException; + +public class MemberAlreadyLeftMatchingRoomException extends BaseException { + + public MemberAlreadyLeftMatchingRoomException() { + super(CONFLICT, MEMBER_ALREADY_LEFT_MATCHING_ROOM.getMessage()); + } +} diff --git a/src/main/java/com/gachtaxi/domain/matching/common/exception/MemberNotInMatchingRoomException.java b/src/main/java/com/gachtaxi/domain/matching/common/exception/MemberNotInMatchingRoomException.java new file mode 100644 index 00000000..457aac99 --- /dev/null +++ b/src/main/java/com/gachtaxi/domain/matching/common/exception/MemberNotInMatchingRoomException.java @@ -0,0 +1,13 @@ +package com.gachtaxi.domain.matching.common.exception; + +import static com.gachtaxi.domain.matching.common.exception.ErrorMessage.*; +import static org.springframework.http.HttpStatus.BAD_REQUEST; + +import com.gachtaxi.global.common.exception.BaseException; + +public class MemberNotInMatchingRoomException extends BaseException { + + public MemberNotInMatchingRoomException() { + super(BAD_REQUEST, MEMBER_NOT_IN_MATCHING_ROOM.getMessage()); + } +} diff --git a/src/main/java/com/gachtaxi/domain/matching/common/exception/NoSuchMatchingRoomException.java b/src/main/java/com/gachtaxi/domain/matching/common/exception/NoSuchMatchingRoomException.java index 997d6d41..931e7fb6 100644 --- a/src/main/java/com/gachtaxi/domain/matching/common/exception/NoSuchMatchingRoomException.java +++ b/src/main/java/com/gachtaxi/domain/matching/common/exception/NoSuchMatchingRoomException.java @@ -1,11 +1,13 @@ package com.gachtaxi.domain.matching.common.exception; +import static com.gachtaxi.domain.matching.common.exception.ErrorMessage.NO_SUCH_MATCHING_ROOM; +import static org.springframework.http.HttpStatus.NOT_FOUND; + import com.gachtaxi.global.common.exception.BaseException; -import org.springframework.http.HttpStatus; public class NoSuchMatchingRoomException extends BaseException { public NoSuchMatchingRoomException() { - super(HttpStatus.NOT_FOUND, ErrorMessage.NO_SUCH_MATCHING_ROOM.getMessage()); + super(NOT_FOUND, NO_SUCH_MATCHING_ROOM.getMessage()); } } diff --git a/src/main/java/com/gachtaxi/domain/matching/common/exception/NotActiveMatchingRoomException.java b/src/main/java/com/gachtaxi/domain/matching/common/exception/NotActiveMatchingRoomException.java index 88ded883..58197e5a 100644 --- a/src/main/java/com/gachtaxi/domain/matching/common/exception/NotActiveMatchingRoomException.java +++ b/src/main/java/com/gachtaxi/domain/matching/common/exception/NotActiveMatchingRoomException.java @@ -1,11 +1,13 @@ package com.gachtaxi.domain.matching.common.exception; +import static com.gachtaxi.domain.matching.common.exception.ErrorMessage.NOT_ACTIVE_MATCHING_ROOM; +import static org.springframework.http.HttpStatus.BAD_REQUEST; + import com.gachtaxi.global.common.exception.BaseException; -import org.springframework.http.HttpStatus; public class NotActiveMatchingRoomException extends BaseException { public NotActiveMatchingRoomException() { - super(HttpStatus.BAD_REQUEST, ErrorMessage.NOT_ACTIVE_MATCHING_ROOM.getMessage()); + super(BAD_REQUEST, NOT_ACTIVE_MATCHING_ROOM.getMessage()); } } diff --git a/src/main/java/com/gachtaxi/domain/matching/common/exception/NotDefinedKafkaTemplateException.java b/src/main/java/com/gachtaxi/domain/matching/common/exception/NotDefinedKafkaTemplateException.java new file mode 100644 index 00000000..2bb75035 --- /dev/null +++ b/src/main/java/com/gachtaxi/domain/matching/common/exception/NotDefinedKafkaTemplateException.java @@ -0,0 +1,13 @@ +package com.gachtaxi.domain.matching.common.exception; + +import static com.gachtaxi.domain.matching.common.exception.ErrorMessage.NOT_DEFINED_KAFKA_TEMPLATE; +import static org.springframework.http.HttpStatus.INTERNAL_SERVER_ERROR; + +import com.gachtaxi.global.common.exception.BaseException; + +public class NotDefinedKafkaTemplateException extends BaseException { + + public NotDefinedKafkaTemplateException() { + super(INTERNAL_SERVER_ERROR, NOT_DEFINED_KAFKA_TEMPLATE.getMessage()); + } +} diff --git a/src/main/java/com/gachtaxi/domain/matching/common/repository/MemberMatchingRoomChargingInfoRepository.java b/src/main/java/com/gachtaxi/domain/matching/common/repository/MemberMatchingRoomChargingInfoRepository.java index d447c308..2a76f9f1 100644 --- a/src/main/java/com/gachtaxi/domain/matching/common/repository/MemberMatchingRoomChargingInfoRepository.java +++ b/src/main/java/com/gachtaxi/domain/matching/common/repository/MemberMatchingRoomChargingInfoRepository.java @@ -2,12 +2,16 @@ import com.gachtaxi.domain.matching.common.entity.MatchingRoom; import com.gachtaxi.domain.matching.common.entity.MemberMatchingRoomChargingInfo; +import com.gachtaxi.domain.matching.common.entity.enums.PaymentStatus; +import com.gachtaxi.domain.members.entity.Members; import java.util.List; +import java.util.Optional; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Repository; @Repository public interface MemberMatchingRoomChargingInfoRepository extends JpaRepository { - List findByMatchingRoom(MatchingRoom matchingRoom); + List findByMatchingRoomAndPaymentStatus(MatchingRoom matchingRoom, PaymentStatus paymentStatus); + Optional findByMembersAndMatchingRoom(Members members, MatchingRoom matchingRoom); } diff --git a/src/main/java/com/gachtaxi/domain/matching/common/service/AutoMatchingService.java b/src/main/java/com/gachtaxi/domain/matching/common/service/AutoMatchingService.java index 56b960b3..735ba687 100644 --- a/src/main/java/com/gachtaxi/domain/matching/common/service/AutoMatchingService.java +++ b/src/main/java/com/gachtaxi/domain/matching/common/service/AutoMatchingService.java @@ -3,31 +3,29 @@ import com.gachtaxi.domain.matching.algorithm.dto.FindRoomResult; import com.gachtaxi.domain.matching.algorithm.service.MatchingAlgorithmService; import com.gachtaxi.domain.matching.common.dto.enums.AutoMatchingStatus; +import com.gachtaxi.domain.matching.common.dto.request.AutoMatchingCancelledRequest; import com.gachtaxi.domain.matching.common.dto.request.AutoMatchingPostRequest; import com.gachtaxi.domain.matching.common.dto.response.AutoMatchingPostResponse; import com.gachtaxi.domain.matching.common.entity.enums.Tags; -import com.gachtaxi.domain.matching.event.dto.kafka_topic.MatchMemberJoinedEvent; -import com.gachtaxi.domain.matching.event.dto.kafka_topic.MatchRoomCreatedEvent; +import com.gachtaxi.domain.matching.event.MatchingEventFactory; import com.gachtaxi.domain.matching.event.service.kafka.AutoMatchingProducer; import com.gachtaxi.domain.matching.event.service.sse.SseService; -import java.time.LocalDateTime; import java.util.List; import java.util.Optional; -import java.util.UUID; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; +@Slf4j @Service @RequiredArgsConstructor public class AutoMatchingService { - private static final int AUTO_MAX_CAPACITY = 4; - private static final String AUTO_DESCRIPTION = "AUTO_MATCHING"; - private final SseService sseService; - private final AutoMatchingProducer autoMatchingProducer; private final MatchingAlgorithmService matchingAlgorithmService; + private final MatchingEventFactory matchingEventFactory; + private final AutoMatchingProducer autoMatchingProducer; public SseEmitter handleSubscribe(Long userId) { return this.sseService.subscribe(userId); @@ -65,31 +63,19 @@ public AutoMatchingPostResponse handlerAutoRequestMatching( private void sendMatchRoomCreatedEvent(Long memberId, AutoMatchingPostRequest autoMatchingPostRequest) { - MatchRoomCreatedEvent createdEvent = MatchRoomCreatedEvent.builder() - .hostMemberId(memberId) - .startPoint(autoMatchingPostRequest.startPoint()) - .startName(autoMatchingPostRequest.startName()) - .destinationPoint(autoMatchingPostRequest.destinationPoint()) - .destinationName(autoMatchingPostRequest.destinationName()) - .maxCapacity(AUTO_MAX_CAPACITY) - .title(UUID.randomUUID().toString()) - .description(AUTO_DESCRIPTION) - .expectedTotalCharge(autoMatchingPostRequest.expectedTotalCharge()) - .criteria(autoMatchingPostRequest.getCriteria()) - .build(); - - this.autoMatchingProducer.sendMatchRoomCreatedEvent(createdEvent); + this.autoMatchingProducer.sendEvent(this.matchingEventFactory.createMatchRoomCreatedEvent(memberId, autoMatchingPostRequest)); } private void sendMatchMemberJoinedEvent(Long memberId, FindRoomResult roomResult) { Long roomId = roomResult.roomId(); + this.autoMatchingProducer.sendEvent(this.matchingEventFactory.createMatchMemberJoinedEvent(roomId, memberId)); + } + + public AutoMatchingPostResponse handlerAutoCancelMatching(Long memberId, + AutoMatchingCancelledRequest autoMatchingCancelledRequest) { - MatchMemberJoinedEvent joinedEvent = MatchMemberJoinedEvent.builder() - .roomId(roomId) - .memberId(memberId) - .joinedAt(LocalDateTime.now()) - .build(); + this.autoMatchingProducer.sendEvent(this.matchingEventFactory.createMatchMemberCancelledEvent(autoMatchingCancelledRequest.roomId(), memberId)); - this.autoMatchingProducer.sendMatchMemberJoinedEvent(joinedEvent); + return AutoMatchingPostResponse.of(AutoMatchingStatus.CANCELLED); } } diff --git a/src/main/java/com/gachtaxi/domain/matching/common/service/MatchingRoomService.java b/src/main/java/com/gachtaxi/domain/matching/common/service/MatchingRoomService.java index 703bc0ad..cb02b2aa 100644 --- a/src/main/java/com/gachtaxi/domain/matching/common/service/MatchingRoomService.java +++ b/src/main/java/com/gachtaxi/domain/matching/common/service/MatchingRoomService.java @@ -4,23 +4,33 @@ import com.gachtaxi.domain.matching.common.entity.MatchingRoomTagInfo; import com.gachtaxi.domain.matching.common.entity.MemberMatchingRoomChargingInfo; import com.gachtaxi.domain.matching.common.entity.Route; -import com.gachtaxi.domain.matching.common.entity.enums.MatchingRoomStatus; +import com.gachtaxi.domain.matching.common.entity.enums.PaymentStatus; import com.gachtaxi.domain.matching.common.entity.enums.Tags; +import com.gachtaxi.domain.matching.common.exception.MemberAlreadyLeftMatchingRoomException; +import com.gachtaxi.domain.matching.common.exception.MemberNotInMatchingRoomException; import com.gachtaxi.domain.matching.common.exception.NoSuchMatchingRoomException; import com.gachtaxi.domain.matching.common.exception.NotActiveMatchingRoomException; import com.gachtaxi.domain.matching.common.repository.MatchingRoomRepository; import com.gachtaxi.domain.matching.common.repository.MatchingRoomTagInfoRepository; import com.gachtaxi.domain.matching.common.repository.MemberMatchingRoomChargingInfoRepository; import com.gachtaxi.domain.matching.common.repository.RouteRepository; +import com.gachtaxi.domain.matching.event.MatchingEventFactory; +import com.gachtaxi.domain.matching.event.dto.kafka_topic.MatchMemberCancelledEvent; import com.gachtaxi.domain.matching.event.dto.kafka_topic.MatchMemberJoinedEvent; +import com.gachtaxi.domain.matching.event.dto.kafka_topic.MatchRoomCancelledEvent; +import com.gachtaxi.domain.matching.event.dto.kafka_topic.MatchRoomCompletedEvent; import com.gachtaxi.domain.matching.event.dto.kafka_topic.MatchRoomCreatedEvent; +import com.gachtaxi.domain.matching.event.service.kafka.AutoMatchingProducer; import com.gachtaxi.domain.members.entity.Members; import com.gachtaxi.domain.members.service.MemberService; import jakarta.transaction.Transactional; import java.util.List; +import java.util.Optional; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; +@Slf4j @Service @RequiredArgsConstructor @Transactional @@ -28,6 +38,7 @@ public class MatchingRoomService { // service private final MemberService memberService; + private final AutoMatchingProducer autoMatchingProducer; // repository private final MatchingRoomRepository matchingRoomRepository; @@ -35,25 +46,20 @@ public class MatchingRoomService { private final RouteRepository routeRepository; private final MemberMatchingRoomChargingInfoRepository memberMatchingRoomChargingInfoRepository; - public MatchingRoom save(MatchRoomCreatedEvent matchRoomCreatedEvent) { - Members members = this.memberService.findById(matchRoomCreatedEvent.hostMemberId()); + // event factory + private final MatchingEventFactory matchingEventFactory; + + public void createMatchingRoom(MatchRoomCreatedEvent matchRoomCreatedEvent) { + Members members = this.memberService.findById(matchRoomCreatedEvent.roomMasterId()); Route route = this.saveRoute(matchRoomCreatedEvent); - MatchingRoom matchingRoom = MatchingRoom.builder() - .capacity(matchRoomCreatedEvent.maxCapacity()) - .roomMaster(members) - .title(matchRoomCreatedEvent.title()) - .description(matchRoomCreatedEvent.description()) - .route(route) - .totalCharge(matchRoomCreatedEvent.expectedTotalCharge()) - .matchingRoomStatus(MatchingRoomStatus.ACTIVE) - .build(); + MatchingRoom matchingRoom = MatchingRoom.activeOf(matchRoomCreatedEvent, members, route); this.saveMatchingRoomTagInfo(matchingRoom, matchRoomCreatedEvent.criteria()); - this.saveHostMemberChargingInfo(matchingRoom, members); + this.saveRoomMasterChargingInfo(matchingRoom, members); - return this.matchingRoomRepository.save(matchingRoom); + this.matchingRoomRepository.save(matchingRoom); } private Route saveRoute(MatchRoomCreatedEvent matchRoomCreatedEvent) { @@ -77,51 +83,47 @@ private Route saveRoute(MatchRoomCreatedEvent matchRoomCreatedEvent) { } private void saveMatchingRoomTagInfo(MatchingRoom matchingRoom, List tags) { - for (Tags tag : tags) { - MatchingRoomTagInfo matchingRoomTagInfo = MatchingRoomTagInfo.builder() - .matchingRoom(matchingRoom) - .tags(tag) - .build(); - - this.matchingRoomTagInfoRepository.save(matchingRoomTagInfo); - } + tags.forEach(tag -> this.matchingRoomTagInfoRepository.save(MatchingRoomTagInfo.of(matchingRoom, tag))); } - private void saveHostMemberChargingInfo(MatchingRoom matchingRoom, Members members) { - MemberMatchingRoomChargingInfo matchingRoomChargingInfo = MemberMatchingRoomChargingInfo.builder() - .matchingRoom(matchingRoom) - .members(members) - .charge(matchingRoom.getTotalCharge()) - .build(); - - this.memberMatchingRoomChargingInfoRepository.save(matchingRoomChargingInfo); + private void saveRoomMasterChargingInfo(MatchingRoom matchingRoom, Members members) { + this.memberMatchingRoomChargingInfoRepository.save(MemberMatchingRoomChargingInfo.notPayedOf(matchingRoom, members)); } public void joinMemberToMatchingRoom(MatchMemberJoinedEvent matchMemberJoinedEvent) { Members members = this.memberService.findById(matchMemberJoinedEvent.memberId()); - MatchingRoom matchingRoom = this.matchingRoomRepository.findById( - matchMemberJoinedEvent.roomId()).orElseThrow( - NoSuchMatchingRoomException::new); + MatchingRoom matchingRoom = this.matchingRoomRepository.findById(matchMemberJoinedEvent.roomId()).orElseThrow(NoSuchMatchingRoomException::new); - if (!matchingRoom.isActiveMatchingRoom()) { + if (!matchingRoom.isActive()) { throw new NotActiveMatchingRoomException(); } - List existMembers = this.memberMatchingRoomChargingInfoRepository.findByMatchingRoom( - matchingRoom); + MemberMatchingRoomChargingInfo requestedMembersInfo = null; + + Optional joinedInPast = this.alreadyJoinedInPast(members, matchingRoom); - // TODO: 딱 떨어지지 않는 금액은 어떻게 해야할지? - int distributedCharge = matchingRoom.getTotalCharge() / (existMembers.size() + 1); + if (joinedInPast.isPresent()) { + requestedMembersInfo = joinedInPast.get().joinMatchingRoom(); + } else { + requestedMembersInfo = MemberMatchingRoomChargingInfo.notPayedOf(matchingRoom, members); + } + this.memberMatchingRoomChargingInfoRepository.save(requestedMembersInfo); + + List existMembers = this.memberMatchingRoomChargingInfoRepository.findByMatchingRoomAndPaymentStatus(matchingRoom, PaymentStatus.NOT_PAYED); - this.memberMatchingRoomChargingInfoRepository.save( - MemberMatchingRoomChargingInfo.builder() - .matchingRoom(matchingRoom) - .members(members) - .charge(distributedCharge) - .build() - ); + int distributedCharge = (int) Math.ceil((double) matchingRoom.getTotalCharge() / (existMembers.size() + 1)); this.updateExistMembersCharge(existMembers, distributedCharge); + + int nowMemberCount = existMembers.size() + 1; + + if (matchingRoom.isFull(nowMemberCount)) { + this.autoMatchingProducer.sendEvent(this.matchingEventFactory.createMatchRoomCompletedEvent(matchingRoom.getId())); + } + } + + private Optional alreadyJoinedInPast(Members members, MatchingRoom matchingRoom) { + return this.memberMatchingRoomChargingInfoRepository.findByMembersAndMatchingRoom(members, matchingRoom); } private void updateExistMembersCharge(List existMembers, int charge) { @@ -130,4 +132,65 @@ private void updateExistMembersCharge(List exist } this.memberMatchingRoomChargingInfoRepository.saveAll(existMembers); } + + public void leaveMemberFromMatchingRoom(MatchMemberCancelledEvent matchMemberCancelledEvent) { + Members members = this.memberService.findById(matchMemberCancelledEvent.memberId()); + MatchingRoom matchingRoom = this.matchingRoomRepository.findById(matchMemberCancelledEvent.roomId()).orElseThrow(NoSuchMatchingRoomException::new); + + MemberMatchingRoomChargingInfo memberMatchingRoomChargingInfo = + this.memberMatchingRoomChargingInfoRepository.findByMembersAndMatchingRoom(members, matchingRoom) + .orElseThrow(MemberNotInMatchingRoomException::new); + + if (memberMatchingRoomChargingInfo.isAlreadyLeft()) { + throw new MemberAlreadyLeftMatchingRoomException(); + } + + memberMatchingRoomChargingInfo.leftMatchingRoom(); + + this.memberMatchingRoomChargingInfoRepository.save(memberMatchingRoomChargingInfo); + + if (members.isRoomMaster(matchingRoom)) { + this.findNextRoomMaster(matchingRoom, members) + .ifPresentOrElse( + nextRoomMaster -> matchingRoom.changeRoomMaster(nextRoomMaster), + () -> this.autoMatchingProducer.sendEvent(this.matchingEventFactory.createMatchRoomCancelledEvent(matchingRoom.getId())) + ); + } + } + + private Optional findNextRoomMaster(MatchingRoom matchingRoom, Members members) { + List existMembers = + this.memberMatchingRoomChargingInfoRepository.findByMatchingRoomAndPaymentStatus(matchingRoom, PaymentStatus.NOT_PAYED); + + return existMembers.stream() + .map(MemberMatchingRoomChargingInfo::getMembers) + .filter(member -> !member.equals(members)) + .findFirst(); + } + + public void cancelMatchingRoom(MatchRoomCancelledEvent matchRoomCancelledEvent) { + MatchingRoom matchingRoom = this.getMatchingRoomById(matchRoomCancelledEvent.roomId()); + + if (!matchingRoom.isActive()) { + throw new NotActiveMatchingRoomException(); + } + + matchingRoom.cancelMatchingRoom(); + this.matchingRoomRepository.save(matchingRoom); + } + + public void completeMatchingRoom(MatchRoomCompletedEvent matchRoomCompletedEvent) { + MatchingRoom matchingRoom = this.getMatchingRoomById(matchRoomCompletedEvent.roomId()); + + if (!matchingRoom.isActive()) { + throw new NotActiveMatchingRoomException(); + } + + matchingRoom.completeMatchingRoom(); + this.matchingRoomRepository.save(matchingRoom); + } + + private MatchingRoom getMatchingRoomById(Long roomId) { + return this.matchingRoomRepository.findById(roomId).orElseThrow(NoSuchMatchingRoomException::new); + } } diff --git a/src/main/java/com/gachtaxi/domain/matching/event/MatchingEventFactory.java b/src/main/java/com/gachtaxi/domain/matching/event/MatchingEventFactory.java new file mode 100644 index 00000000..d44bf9bb --- /dev/null +++ b/src/main/java/com/gachtaxi/domain/matching/event/MatchingEventFactory.java @@ -0,0 +1,55 @@ +package com.gachtaxi.domain.matching.event; + +import com.gachtaxi.domain.matching.common.dto.request.AutoMatchingPostRequest; +import com.gachtaxi.domain.matching.event.dto.kafka_topic.MatchMemberCancelledEvent; +import com.gachtaxi.domain.matching.event.dto.kafka_topic.MatchMemberJoinedEvent; +import com.gachtaxi.domain.matching.event.dto.kafka_topic.MatchRoomCancelledEvent; +import com.gachtaxi.domain.matching.event.dto.kafka_topic.MatchRoomCompletedEvent; +import com.gachtaxi.domain.matching.event.dto.kafka_topic.MatchRoomCreatedEvent; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +@Component +public class MatchingEventFactory { + + @Value("${gachtaxi.kafka.topics.match-member-cancelled}") + private String matchMemberCancelledTopic; + + @Value("${gachtaxi.kafka.topics.match-member-joined}") + private String matchMemberJoinedTopic; + + @Value("${gachtaxi.kafka.topics.match-room-cancelled}") + private String matchRoomCancelledTopic; + + @Value("${gachtaxi.kafka.topics.match-room-completed}") + private String matchRoomCompletedTopic; + + @Value("${gachtaxi.kafka.topics.match-room-created}") + private String matchRoomCreatedTopic; + + @Value("${gachtaxi.matching.auto-matching-max-capacity}") + private int autoMaxCapacity; + + @Value("${gachtaxi.matching.auto-matching-description}") + private String autoDescription; + + public MatchMemberCancelledEvent createMatchMemberCancelledEvent(Long roomId, Long memberId) { + return MatchMemberCancelledEvent.of(roomId, memberId, this.matchMemberCancelledTopic); + } + + public MatchMemberJoinedEvent createMatchMemberJoinedEvent(Long roomId, Long memberId) { + return MatchMemberJoinedEvent.of(roomId, memberId, this.matchMemberJoinedTopic); + } + + public MatchRoomCancelledEvent createMatchRoomCancelledEvent(Long roomId) { + return MatchRoomCancelledEvent.of(roomId, this.matchRoomCancelledTopic); + } + + public MatchRoomCompletedEvent createMatchRoomCompletedEvent(Long roomId) { + return MatchRoomCompletedEvent.of(roomId, this.matchRoomCompletedTopic); + } + + public MatchRoomCreatedEvent createMatchRoomCreatedEvent(Long memberId, AutoMatchingPostRequest autoMatchingPostRequest) { + return MatchRoomCreatedEvent.of(memberId, autoMatchingPostRequest, this.autoMaxCapacity, this.autoDescription, this.matchRoomCreatedTopic); + } +} diff --git a/src/main/java/com/gachtaxi/domain/matching/event/dto/kafka_topic/MatchMemberCancelledEvent.java b/src/main/java/com/gachtaxi/domain/matching/event/dto/kafka_topic/MatchMemberCancelledEvent.java new file mode 100644 index 00000000..32922887 --- /dev/null +++ b/src/main/java/com/gachtaxi/domain/matching/event/dto/kafka_topic/MatchMemberCancelledEvent.java @@ -0,0 +1,37 @@ +package com.gachtaxi.domain.matching.event.dto.kafka_topic; + +import com.fasterxml.jackson.annotation.JsonFormat; +import java.time.LocalDateTime; +import lombok.AccessLevel; +import lombok.Builder; +import org.springframework.beans.factory.annotation.Value; + +@Builder(access = AccessLevel.PRIVATE) +public record MatchMemberCancelledEvent( + Long roomId, + Long memberId, + + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + LocalDateTime canceledAt, + + String topic +) implements MatchingEvent{ + + @Override + public Object getKey() { + return String.valueOf(this.roomId); + } + + @Override + public String getTopic() { + return this.topic; + } + + public static MatchMemberCancelledEvent of(Long roomId, Long memberId, String topic) { + return MatchMemberCancelledEvent.builder() + .roomId(roomId) + .memberId(memberId) + .topic(topic) + .build(); + } +} diff --git a/src/main/java/com/gachtaxi/domain/matching/event/dto/kafka_topic/MatchMemberJoinedEvent.java b/src/main/java/com/gachtaxi/domain/matching/event/dto/kafka_topic/MatchMemberJoinedEvent.java index a1a313b8..22d1f395 100644 --- a/src/main/java/com/gachtaxi/domain/matching/event/dto/kafka_topic/MatchMemberJoinedEvent.java +++ b/src/main/java/com/gachtaxi/domain/matching/event/dto/kafka_topic/MatchMemberJoinedEvent.java @@ -2,18 +2,36 @@ import com.fasterxml.jackson.annotation.JsonFormat; import java.time.LocalDateTime; +import lombok.AccessLevel; import lombok.Builder; +import org.springframework.beans.factory.annotation.Value; -@Builder +@Builder(access = AccessLevel.PRIVATE) public record MatchMemberJoinedEvent( Long roomId, Long memberId, @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") - LocalDateTime joinedAt -) { + LocalDateTime joinedAt, - public static MatchMemberJoinedEvent of(Long roomId, Long memberId) { - return new MatchMemberJoinedEvent(roomId, memberId, LocalDateTime.now()); + String topic +) implements MatchingEvent{ + + @Override + public Object getKey() { + return String.valueOf(this.roomId); + } + + @Override + public String getTopic() { + return this.topic; + } + + public static MatchMemberJoinedEvent of(Long roomId, Long memberId, String topic) { + return MatchMemberJoinedEvent.builder() + .roomId(roomId) + .memberId(memberId) + .topic(topic) + .build(); } } diff --git a/src/main/java/com/gachtaxi/domain/matching/event/dto/kafka_topic/MatchRoomCancelledEvent.java b/src/main/java/com/gachtaxi/domain/matching/event/dto/kafka_topic/MatchRoomCancelledEvent.java new file mode 100644 index 00000000..b6870b74 --- /dev/null +++ b/src/main/java/com/gachtaxi/domain/matching/event/dto/kafka_topic/MatchRoomCancelledEvent.java @@ -0,0 +1,30 @@ +package com.gachtaxi.domain.matching.event.dto.kafka_topic; + +import lombok.AccessLevel; +import lombok.Builder; +import org.springframework.beans.factory.annotation.Value; + +@Builder(access = AccessLevel.PRIVATE) +public record MatchRoomCancelledEvent( + Long roomId, + + String topic +) implements MatchingEvent{ + + @Override + public Object getKey() { + return String.valueOf(this.roomId); + } + + @Override + public String getTopic() { + return this.topic; + } + + public static MatchRoomCancelledEvent of(Long roomId, String topic) { + return MatchRoomCancelledEvent.builder() + .roomId(roomId) + .topic(topic) + .build(); + } +} diff --git a/src/main/java/com/gachtaxi/domain/matching/event/dto/kafka_topic/MatchRoomCompletedEvent.java b/src/main/java/com/gachtaxi/domain/matching/event/dto/kafka_topic/MatchRoomCompletedEvent.java new file mode 100644 index 00000000..80669452 --- /dev/null +++ b/src/main/java/com/gachtaxi/domain/matching/event/dto/kafka_topic/MatchRoomCompletedEvent.java @@ -0,0 +1,30 @@ +package com.gachtaxi.domain.matching.event.dto.kafka_topic; + +import lombok.AccessLevel; +import lombok.Builder; +import org.springframework.beans.factory.annotation.Value; + +@Builder(access = AccessLevel.PRIVATE) +public record MatchRoomCompletedEvent( + Long roomId, + + String topic +) implements MatchingEvent{ + + @Override + public Object getKey() { + return String.valueOf(this.roomId); + } + + @Override + public String getTopic() { + return this.topic; + } + + public static MatchRoomCompletedEvent of(Long roomId, String topic) { + return MatchRoomCompletedEvent.builder() + .roomId(roomId) + .topic(topic) + .build(); + } +} diff --git a/src/main/java/com/gachtaxi/domain/matching/event/dto/kafka_topic/MatchRoomCreatedEvent.java b/src/main/java/com/gachtaxi/domain/matching/event/dto/kafka_topic/MatchRoomCreatedEvent.java index e1ca21c8..89333de7 100644 --- a/src/main/java/com/gachtaxi/domain/matching/event/dto/kafka_topic/MatchRoomCreatedEvent.java +++ b/src/main/java/com/gachtaxi/domain/matching/event/dto/kafka_topic/MatchRoomCreatedEvent.java @@ -1,14 +1,18 @@ package com.gachtaxi.domain.matching.event.dto.kafka_topic; import com.fasterxml.jackson.annotation.JsonFormat; +import com.gachtaxi.domain.matching.common.dto.request.AutoMatchingPostRequest; import com.gachtaxi.domain.matching.common.entity.enums.Tags; import java.time.LocalDateTime; import java.util.List; +import java.util.UUID; +import lombok.AccessLevel; import lombok.Builder; +import org.springframework.beans.factory.annotation.Value; -@Builder +@Builder(access = AccessLevel.PRIVATE) public record MatchRoomCreatedEvent( - Long hostMemberId, + Long roomMasterId, Integer maxCapacity, String title, String description, @@ -20,7 +24,40 @@ public record MatchRoomCreatedEvent( Integer expectedTotalCharge, @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") - LocalDateTime createdAt -) { + LocalDateTime createdAt, + String topic +) implements MatchingEvent{ + + @Override + public Object getKey() { + return null; + } + + @Override + public String getTopic() { + return this.topic; + } + + public static MatchRoomCreatedEvent of( + Long roomMasterId, + AutoMatchingPostRequest autoMatchingPostRequest, + int maxCapacity, + String description, + String topic + ) { + return MatchRoomCreatedEvent.builder() + .roomMasterId(roomMasterId) + .startPoint(autoMatchingPostRequest.startPoint()) + .startName(autoMatchingPostRequest.startName()) + .destinationPoint(autoMatchingPostRequest.destinationPoint()) + .destinationName(autoMatchingPostRequest.destinationName()) + .maxCapacity(maxCapacity) + .title(UUID.randomUUID().toString()) + .description(description) + .expectedTotalCharge(autoMatchingPostRequest.expectedTotalCharge()) + .criteria(autoMatchingPostRequest.getCriteria()) + .topic(topic) + .build(); + } } diff --git a/src/main/java/com/gachtaxi/domain/matching/event/dto/kafka_topic/MatchingEvent.java b/src/main/java/com/gachtaxi/domain/matching/event/dto/kafka_topic/MatchingEvent.java new file mode 100644 index 00000000..a2b0ae4a --- /dev/null +++ b/src/main/java/com/gachtaxi/domain/matching/event/dto/kafka_topic/MatchingEvent.java @@ -0,0 +1,7 @@ +package com.gachtaxi.domain.matching.event.dto.kafka_topic; + +public interface MatchingEvent { + + Object getKey(); + String getTopic(); +} diff --git a/src/main/java/com/gachtaxi/domain/matching/event/service/kafka/AutoMatchingConsumer.java b/src/main/java/com/gachtaxi/domain/matching/event/service/kafka/AutoMatchingConsumer.java index 9f041594..62b71401 100644 --- a/src/main/java/com/gachtaxi/domain/matching/event/service/kafka/AutoMatchingConsumer.java +++ b/src/main/java/com/gachtaxi/domain/matching/event/service/kafka/AutoMatchingConsumer.java @@ -1,7 +1,10 @@ package com.gachtaxi.domain.matching.event.service.kafka; import com.gachtaxi.domain.matching.common.service.MatchingRoomService; +import com.gachtaxi.domain.matching.event.dto.kafka_topic.MatchMemberCancelledEvent; import com.gachtaxi.domain.matching.event.dto.kafka_topic.MatchMemberJoinedEvent; +import com.gachtaxi.domain.matching.event.dto.kafka_topic.MatchRoomCancelledEvent; +import com.gachtaxi.domain.matching.event.dto.kafka_topic.MatchRoomCompletedEvent; import com.gachtaxi.domain.matching.event.dto.kafka_topic.MatchRoomCreatedEvent; import com.gachtaxi.domain.matching.event.service.sse.SseService; import lombok.RequiredArgsConstructor; @@ -29,13 +32,14 @@ public void onMatchRoomCreated(MatchRoomCreatedEvent event, Acknowledgment ack) try { log.info("[KAFKA CONSUMER] Received MatchRoomCreatedEvent: {}", event); - this.matchingRoomService.save(event); + this.matchingRoomService.createMatchingRoom(event); - this.sseService.sendToClient(event.hostMemberId(), "MATCH_ROOM_CREATED", event); + this.sseService.sendToClient(event.roomMasterId(), "MATCH_ROOM_CREATED", event); ack.acknowledge(); } catch (Exception e) { log.error("[KAFKA CONSUMER] Error processing MatchRoomCreatedEvent", e); + this.sseService.sendToClient(event.roomMasterId(), "MATCH_ROOM_CREATED", e.getMessage()); } } @@ -52,12 +56,78 @@ public void onMatchMemberJoined(MatchMemberJoinedEvent event, Acknowledgment ack this.matchingRoomService.joinMemberToMatchingRoom(event); - this.sseService.sendToClient(event.memberId(), "MATCH_MEMBER_JOINED", event); this.sseService.broadcast("MATCH_MEMBER_JOINED", event); ack.acknowledge(); } catch (Exception e) { log.error("[KAFKA CONSUMER] Error processing MatchMemberJoinedEvent", e); + this.sseService.sendToClient(event.memberId(), "MATCH_MEMBER_JOINED", e.getMessage()); + } + } + + /** + * 방 멤버 취소 이벤트 구독 + */ + @KafkaListener( + topics = "${gachtaxi.kafka.topics.match-member-cancelled}", + containerFactory = "matchMemberCancelledEventListenerFactory" + ) + public void onMatchMemberLeft(MatchMemberCancelledEvent event, Acknowledgment ack) { + try { + log.info("[KAFKA CONSUMER] Received MatchMemberLeftEvent: {}", event); + + this.matchingRoomService.leaveMemberFromMatchingRoom(event); + + this.sseService.broadcast("MATCH_MEMBER_LEFT", event); + + ack.acknowledge(); + } catch (Exception e) { + log.error("[KAFKA CONSUMER] Error processing MatchMemberLeftEvent", e); + this.sseService.sendToClient(event.memberId(), "MATCH_MEMBER_LEFT", e.getMessage()); + } + } + + /** + * 방 취소 이벤트 구독 + */ + @KafkaListener( + topics = "${gachtaxi.kafka.topics.match-room-cancelled}", + containerFactory = "matchRoomCancelledEventListenerFactory" + ) + public void onMatchRoomCancelled(MatchRoomCancelledEvent event, Acknowledgment ack) { + try { + log.info("[KAFKA CONSUMER] Received MatchRoomCancelledEvent: {}", event); + + this.matchingRoomService.cancelMatchingRoom(event); + + this.sseService.broadcast("MATCH_ROOM_CANCELLED", event); + + ack.acknowledge(); + } catch (Exception e) { + log.error("[KAFKA CONSUMER] Error processing MatchRoomCancelledEvent", e); + this.sseService.sendToClient(event.roomId(), "MATCH_ROOM_CANCELLED", e.getMessage()); + } + } + + /** + * 방 완료 이벤트 구독 + */ + @KafkaListener( + topics = "${gachtaxi.kafka.topics.match-room-completed}", + containerFactory = "matchRoomCompletedEventListenerFactory" + ) + public void onMatchingRoomCompleted(MatchRoomCompletedEvent event, Acknowledgment ack) { + try { + log.info("[KAFKA CONSUMER] Received MatchingRoomCompletedEvent: {}", event); + + this.matchingRoomService.completeMatchingRoom(event); + + this.sseService.broadcast("MATCH_ROOM_COMPLETED", event); + + ack.acknowledge(); + } catch (Exception e) { + log.error("[KAFKA CONSUMER] Error processing MatchingRoomCompletedEvent", e); + this.sseService.sendToClient(event.roomId(), "MATCH_ROOM_COMPLETED", e.getMessage()); } } } \ No newline at end of file diff --git a/src/main/java/com/gachtaxi/domain/matching/event/service/kafka/AutoMatchingProducer.java b/src/main/java/com/gachtaxi/domain/matching/event/service/kafka/AutoMatchingProducer.java index 406b9d37..037c85a6 100644 --- a/src/main/java/com/gachtaxi/domain/matching/event/service/kafka/AutoMatchingProducer.java +++ b/src/main/java/com/gachtaxi/domain/matching/event/service/kafka/AutoMatchingProducer.java @@ -1,70 +1,49 @@ package com.gachtaxi.domain.matching.event.service.kafka; -import com.gachtaxi.domain.matching.event.dto.kafka_topic.MatchMemberJoinedEvent; -import com.gachtaxi.domain.matching.event.dto.kafka_topic.MatchRoomCreatedEvent; +import static com.gachtaxi.global.auth.jwt.util.kafka.KafkaBeanSuffix.KAFKA_TEMPLATE_SUFFIX; + +import com.gachtaxi.domain.matching.common.exception.NotDefinedKafkaTemplateException; +import com.gachtaxi.domain.matching.event.dto.kafka_topic.MatchingEvent; +import com.gachtaxi.global.auth.jwt.util.KafkaBeanUtils; import java.util.concurrent.CompletableFuture; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.RecordMetadata; -import org.springframework.beans.factory.annotation.Value; +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.stereotype.Service; +import org.springframework.stereotype.Component; -@Slf4j -@Service +@Component @RequiredArgsConstructor +@Slf4j public class AutoMatchingProducer { - private final KafkaTemplate matchRoomCreatedEventKafkaTemplate; - private final KafkaTemplate matchMemberJoinedEventKafkaTemplate; - - @Value("${kafka.topic.match-room-created}") - private String matchRoomCreatedTopic; - - @Value("${kafka.topic.match-member-joined}") - private String matchMemberJoinedTopic; - - /** - * 방 생성 이벤트를 발행 - */ - public void sendMatchRoomCreatedEvent(MatchRoomCreatedEvent matchRoomCreatedEvent) { - String key = matchRoomCreatedEvent.title(); - - CompletableFuture future = this.matchRoomCreatedEventKafkaTemplate.send(matchRoomCreatedTopic, key, matchRoomCreatedEvent); - - future.thenAccept(result -> { - if (result instanceof RecordMetadata metadata) { - log.info("[KAFKA PRODUCER] Success sending MatchRoomCreatedEvent: " - + "topic={}, partition={}, offset={}, key={}", - metadata.topic(), metadata.partition(), metadata.offset(), key - ); - } - } - ).exceptionally(ex -> { - log.error("[KAFKA PRODUCER] Failed to send MatchRoomCreatedEvent key={}", key, ex); - return null; - }); - } + private final ApplicationContext applicationContext; - /** - * 방 멤버 참가 이벤트를 발행 - */ - public void sendMatchMemberJoinedEvent(MatchMemberJoinedEvent matchMemberJoinedEvent) { - String key = String.valueOf(matchMemberJoinedEvent.roomId()); + public void sendEvent(MatchingEvent matchingEvent) { + String topic = matchingEvent.getTopic(); + Object key = matchingEvent.getKey(); - CompletableFuture future = this.matchMemberJoinedEventKafkaTemplate.send(matchMemberJoinedTopic, key, matchMemberJoinedEvent); + try { + KafkaTemplate kafkaTemplate = this.applicationContext.getBean( + KafkaBeanUtils.getBeanName(topic, KAFKA_TEMPLATE_SUFFIX), KafkaTemplate.class); + CompletableFuture future = kafkaTemplate.send(matchingEvent.getTopic(), matchingEvent.getKey(), matchingEvent); - future.thenAccept(result -> { - if (result instanceof RecordMetadata metadata) { - log.info("[KAFKA PRODUCER] Success sending MatchMemberJoinedEvent: " - + "topic={}, partition={}, offset={}, key={}", - metadata.topic(), metadata.partition(), metadata.offset(), key - ); + future.thenAccept(result -> { + if (result instanceof RecordMetadata metadata) { + log.info("[KAFKA PRODUCER] Success sending MatchRoomCreatedEvent: " + + "topic={}, partition={}, offset={}, key={}", + metadata.topic(), metadata.partition(), metadata.offset(), key + ); + } } - } - ).exceptionally(ex -> { - log.error("[KAFKA PRODUCER] Failed to send MatchMemberJoinedEvent key={}", key, ex); - return null; - }); + ).exceptionally(ex -> { + log.error("[KAFKA PRODUCER] Failed to send MatchRoomCreatedEvent key={}", key, ex); + return null; + }); + } catch (BeansException beansException) { + throw new NotDefinedKafkaTemplateException(); + } } -} \ No newline at end of file +} diff --git a/src/main/java/com/gachtaxi/domain/members/entity/Members.java b/src/main/java/com/gachtaxi/domain/members/entity/Members.java index f61773e9..920c6dbc 100644 --- a/src/main/java/com/gachtaxi/domain/members/entity/Members.java +++ b/src/main/java/com/gachtaxi/domain/members/entity/Members.java @@ -1,5 +1,7 @@ package com.gachtaxi.domain.members.entity; +import com.gachtaxi.domain.matching.common.entity.MatchingRoom; +import com.gachtaxi.domain.members.dto.request.UserSignUpRequestDto; import com.gachtaxi.domain.members.dto.request.MemberAgreementRequestDto; import com.gachtaxi.domain.members.dto.request.MemberSupplmentRequestDto; import com.gachtaxi.domain.members.entity.enums.Gender; @@ -7,6 +9,7 @@ import com.gachtaxi.domain.members.entity.enums.UserStatus; import com.gachtaxi.global.common.entity.BaseEntity; import jakarta.persistence.*; +import java.util.Objects; import lombok.AccessLevel; import lombok.Builder; import lombok.Getter; @@ -128,6 +131,28 @@ public static Members ofKakaoId(Long kakaoId){ .build(); } + public boolean isRoomMaster(MatchingRoom matchingRoom){ + return this.equals(matchingRoom.getRoomMaster()); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Members members = (Members) o; + return Objects.equals(studentNumber, members.studentNumber) && Objects.equals( + kakaoId, members.kakaoId); + } + + @Override + public int hashCode() { + return Objects.hash(studentNumber, kakaoId); + } + public static Members ofGoogleId(String googleId){ return Members.builder() .googleId(googleId) diff --git a/src/main/java/com/gachtaxi/global/auth/jwt/util/KafkaBeanUtils.java b/src/main/java/com/gachtaxi/global/auth/jwt/util/KafkaBeanUtils.java new file mode 100644 index 00000000..b31e8655 --- /dev/null +++ b/src/main/java/com/gachtaxi/global/auth/jwt/util/KafkaBeanUtils.java @@ -0,0 +1,25 @@ +package com.gachtaxi.global.auth.jwt.util; + +import java.util.StringTokenizer; + +public abstract class KafkaBeanUtils { + + public static String getBeanName(String topic, String suffix) { + StringTokenizer stringTokenizer = new StringTokenizer(topic, "-_"); + + StringBuilder beanNameBuilder = new StringBuilder(); + beanNameBuilder.append(stringTokenizer.nextToken()); + + while (stringTokenizer.hasMoreTokens()) { + beanNameBuilder.append(getFirstUpperString(stringTokenizer.nextToken())); + } + + beanNameBuilder.append(suffix); + + return beanNameBuilder.toString(); + } + + private static String getFirstUpperString(String str) { + return str.substring(0, 1).toUpperCase() + str.substring(1); + } +} diff --git a/src/main/java/com/gachtaxi/global/auth/jwt/util/kafka/KafkaBeanSuffix.java b/src/main/java/com/gachtaxi/global/auth/jwt/util/kafka/KafkaBeanSuffix.java new file mode 100644 index 00000000..b008743b --- /dev/null +++ b/src/main/java/com/gachtaxi/global/auth/jwt/util/kafka/KafkaBeanSuffix.java @@ -0,0 +1,8 @@ +package com.gachtaxi.global.auth.jwt.util.kafka; + +public abstract class KafkaBeanSuffix { + + public static final String PRODUCER_FACTORY_SUFFIX = "ProducerFactory"; + public static final String NEW_TOPIC_SUFFIX = "Topic"; + public static final String KAFKA_TEMPLATE_SUFFIX = "KafkaTemplate"; +} diff --git a/src/main/java/com/gachtaxi/global/config/kafka/DefaultKafkaProducerConfig.java b/src/main/java/com/gachtaxi/global/config/kafka/DefaultKafkaProducerConfig.java new file mode 100644 index 00000000..d2a79cbc --- /dev/null +++ b/src/main/java/com/gachtaxi/global/config/kafka/DefaultKafkaProducerConfig.java @@ -0,0 +1,42 @@ +package com.gachtaxi.global.config.kafka; + +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.serializer.JsonSerializer; + +@Configuration +public class DefaultKafkaProducerConfig { + + @Value("${spring.kafka.bootstrap-servers}") + private String bootstrapServers; + + @Primary + @Bean + public ProducerFactory producerFactory() { + Map configs = new HashMap<>(); + configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + + configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); + configs.put(ProducerConfig.ACKS_CONFIG, "all"); + configs.put(ProducerConfig.RETRIES_CONFIG, 3); + + return new DefaultKafkaProducerFactory<>(configs); + } + + @Primary + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } +} \ No newline at end of file diff --git a/src/main/java/com/gachtaxi/global/config/kafka/KafkaBeanRegistrar.java b/src/main/java/com/gachtaxi/global/config/kafka/KafkaBeanRegistrar.java new file mode 100644 index 00000000..dc2b3472 --- /dev/null +++ b/src/main/java/com/gachtaxi/global/config/kafka/KafkaBeanRegistrar.java @@ -0,0 +1,102 @@ +package com.gachtaxi.global.config.kafka; + +import static com.gachtaxi.global.auth.jwt.util.kafka.KafkaBeanSuffix.KAFKA_TEMPLATE_SUFFIX; +import static com.gachtaxi.global.auth.jwt.util.kafka.KafkaBeanSuffix.NEW_TOPIC_SUFFIX; +import static com.gachtaxi.global.auth.jwt.util.kafka.KafkaBeanSuffix.PRODUCER_FACTORY_SUFFIX; + +import com.gachtaxi.global.auth.jwt.util.KafkaBeanUtils; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.support.AbstractBeanDefinition; +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.support.BeanDefinitionRegistry; +import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor; +import org.springframework.boot.context.properties.bind.Bindable; +import org.springframework.boot.context.properties.bind.Binder; +import org.springframework.context.EnvironmentAware; +import org.springframework.core.env.Environment; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.stereotype.Component; + +@Component +public class KafkaBeanRegistrar implements BeanDefinitionRegistryPostProcessor, EnvironmentAware { + + private Environment environment; + + @Override + public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) + throws BeansException { + Map topics = Binder.get(environment) + .bind("gachtaxi.kafka.topics", Bindable.mapOf(String.class, String.class)) + .orElse(Collections.emptyMap()); + + for (String topic : topics.values()) { + this.registerProducerFactoryAndKafkaTemplate(topic, registry); + this.registerNewTopic(topic, registry); + } + } + + private void registerKafkaTemplate(String topic, String producerFactoryBeanName, BeanDefinitionRegistry registry) { + String kafkaTemplateBeanName = KafkaBeanUtils.getBeanName(topic, KAFKA_TEMPLATE_SUFFIX); + AbstractBeanDefinition kafkaTemplateBeanDefinition = + BeanDefinitionBuilder.genericBeanDefinition(KafkaTemplate.class) + .addConstructorArgReference(producerFactoryBeanName) + .getBeanDefinition(); + + registry.registerBeanDefinition(kafkaTemplateBeanName, kafkaTemplateBeanDefinition); + } + + private void registerNewTopic(String topic, BeanDefinitionRegistry registry) { + short partitionCount = Short.valueOf(this.environment.getProperty("gachtaxi.kafka.partition-count")); + short replicationFactor = Short.valueOf(this.environment.getProperty("gachtaxi.kafka.replication-factor")); + + String topicBeanName = KafkaBeanUtils.getBeanName(topic, NEW_TOPIC_SUFFIX); + AbstractBeanDefinition newTopicBeanDefinition = + BeanDefinitionBuilder.genericBeanDefinition(NewTopic.class) + .addConstructorArgValue(topic) + .addConstructorArgValue(partitionCount) + .addConstructorArgValue(replicationFactor) + .getBeanDefinition(); + + registry.registerBeanDefinition(topicBeanName, newTopicBeanDefinition); + } + + private void registerProducerFactoryAndKafkaTemplate(String topic, BeanDefinitionRegistry registry) { + String producerBeanName = KafkaBeanUtils.getBeanName(topic, PRODUCER_FACTORY_SUFFIX); + AbstractBeanDefinition producerBeanDefinition = + BeanDefinitionBuilder.genericBeanDefinition(DefaultKafkaProducerFactory.class) + .addConstructorArgValue(this.getProducerOptions()) + .getBeanDefinition(); + + registry.registerBeanDefinition(producerBeanName, producerBeanDefinition); + + this.registerKafkaTemplate(topic, producerBeanName, registry); + } + + private Map getProducerOptions() { + String bootstrapServers = this.environment.getProperty("spring.kafka.bootstrap-servers"); + + Map configs = new HashMap<>(); + configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + + configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); + configs.put(ProducerConfig.ACKS_CONFIG, "all"); + configs.put(ProducerConfig.RETRIES_CONFIG, 3); + + return configs; + } + + @Override + public void setEnvironment(Environment environment) { + this.environment = environment; + } +} diff --git a/src/main/java/com/gachtaxi/global/config/kafka/KafkaConsumerConfig.java b/src/main/java/com/gachtaxi/global/config/kafka/KafkaConsumerConfig.java index 4cfb0cc9..25a925ea 100644 --- a/src/main/java/com/gachtaxi/global/config/kafka/KafkaConsumerConfig.java +++ b/src/main/java/com/gachtaxi/global/config/kafka/KafkaConsumerConfig.java @@ -1,6 +1,9 @@ package com.gachtaxi.global.config.kafka; +import com.gachtaxi.domain.matching.event.dto.kafka_topic.MatchMemberCancelledEvent; import com.gachtaxi.domain.matching.event.dto.kafka_topic.MatchMemberJoinedEvent; +import com.gachtaxi.domain.matching.event.dto.kafka_topic.MatchRoomCancelledEvent; +import com.gachtaxi.domain.matching.event.dto.kafka_topic.MatchRoomCompletedEvent; import com.gachtaxi.domain.matching.event.dto.kafka_topic.MatchRoomCreatedEvent; import java.util.HashMap; import java.util.Map; @@ -72,4 +75,79 @@ public ConcurrentKafkaListenerContainerFactory m factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); return factory; } + + // MatchMemberCanceledEvent + @Bean + public ConsumerFactory matchMemberCancelledEventConsumerFactory() { + Map configs = new HashMap<>(); + configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + JsonDeserializer jsonDeserializer = + new JsonDeserializer<>(MatchMemberCancelledEvent.class); + jsonDeserializer.addTrustedPackages("com.gachtaxi.domain.matching.event.dto"); + + return new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), jsonDeserializer); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory matchMemberCancelledEventListenerFactory() { + ConcurrentKafkaListenerContainerFactory factory + = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(matchMemberCancelledEventConsumerFactory()); + factory.setConcurrency(3); + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); + return factory; + } + + // MatchRoomCancelledEvent + @Bean + public ConsumerFactory matchRoomCancelledEventConsumerFactory() { + Map configs = new HashMap<>(); + configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + JsonDeserializer jsonDeserializer = + new JsonDeserializer<>(MatchRoomCancelledEvent.class); + jsonDeserializer.addTrustedPackages("com.gachtaxi.domain.matching.event.dto"); + + return new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), jsonDeserializer); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory matchRoomCancelledEventListenerFactory() { + ConcurrentKafkaListenerContainerFactory factory + = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(matchRoomCancelledEventConsumerFactory()); + factory.setConcurrency(3); + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); + return factory; + } + + // MatchRoomCompleted + @Bean + public ConsumerFactory matchRoomCompletedEventConsumerFactory() { + Map configs = new HashMap<>(); + configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + JsonDeserializer jsonDeserializer = + new JsonDeserializer<>(MatchRoomCompletedEvent.class); + jsonDeserializer.addTrustedPackages("com.gachtaxi.domain.matching.event.dto"); + + return new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), jsonDeserializer); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory matchRoomCompletedEventListenerFactory() { + ConcurrentKafkaListenerContainerFactory factory + = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(matchRoomCompletedEventConsumerFactory()); + factory.setConcurrency(3); + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); + return factory; + } } \ No newline at end of file diff --git a/src/main/java/com/gachtaxi/global/config/kafka/KafkaProducerConfig.java b/src/main/java/com/gachtaxi/global/config/kafka/KafkaProducerConfig.java deleted file mode 100644 index 651f676d..00000000 --- a/src/main/java/com/gachtaxi/global/config/kafka/KafkaProducerConfig.java +++ /dev/null @@ -1,65 +0,0 @@ -package com.gachtaxi.global.config.kafka; - -import com.gachtaxi.domain.matching.event.dto.kafka_topic.MatchMemberJoinedEvent; -import com.gachtaxi.domain.matching.event.dto.kafka_topic.MatchRoomCreatedEvent; -import java.util.HashMap; -import java.util.Map; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.core.ProducerFactory; -import org.springframework.kafka.support.serializer.JsonSerializer; - -@Configuration -public class KafkaProducerConfig { - - @Value("${spring.kafka.bootstrap-servers}") - private String bootstrapServers; - - @Bean - @Qualifier("matchRoomCreatedEventProducerFactory") - public ProducerFactory matchRoomCreatedEventProducerFactory() { - Map configs = new HashMap<>(); - configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); - - configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); - configs.put(ProducerConfig.ACKS_CONFIG, "all"); - configs.put(ProducerConfig.RETRIES_CONFIG, 3); - - return new DefaultKafkaProducerFactory<>(configs); - } - - @Bean - @Qualifier("matchMemberJoinedEventProducerFactory") - public ProducerFactory matchMemberJoinedEventProducerFactory() { - Map configs = new HashMap<>(); - configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); - - configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); - configs.put(ProducerConfig.ACKS_CONFIG, "all"); - configs.put(ProducerConfig.RETRIES_CONFIG, 3); - - return new DefaultKafkaProducerFactory<>(configs); - } - - @Bean - @Qualifier("matchMemberJoinedEventKafkaTemplate") - public KafkaTemplate matchMemberJoinedEventKafkaTemplate() { - return new KafkaTemplate<>(matchMemberJoinedEventProducerFactory()); - } - - @Bean - @Qualifier("matchRoomCreatedEventKafkaTemplate") - public KafkaTemplate matchRoomCreatedEventKafkaTemplate() { - return new KafkaTemplate<>(matchRoomCreatedEventProducerFactory()); - } -} \ No newline at end of file diff --git a/src/main/java/com/gachtaxi/global/config/kafka/KafkaTopicsConfig.java b/src/main/java/com/gachtaxi/global/config/kafka/KafkaTopicsConfig.java deleted file mode 100644 index 4ecda46e..00000000 --- a/src/main/java/com/gachtaxi/global/config/kafka/KafkaTopicsConfig.java +++ /dev/null @@ -1,40 +0,0 @@ -package com.gachtaxi.global.config.kafka; - -import org.apache.kafka.clients.admin.NewTopic; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -@Configuration -public class KafkaTopicsConfig { - - @Value("${gachtaxi.kafka.topics.match-room-created}") - private String matchRoomCreatedTopic; - - @Value("${gachtaxi.kafka.topics.match-member-joined}") - private String matchMemberJoinedTopic; - - @Value("${gachtaxi.kafka.topics.match-room-cancelled}") - private String matchRoomCancelledTopic; - - @Value("${gachtaxi.kafka.partition-count}") - private short partitionCount; - - @Value("${gachtaxi.kafka.replication-factor}") - private short replicationFactor; - - @Bean - public NewTopic matchRoomCreatedTopic() { - return new NewTopic(matchRoomCreatedTopic, partitionCount, replicationFactor); - } - - @Bean - public NewTopic matchMemberJoinedTopic() { - return new NewTopic(matchMemberJoinedTopic, partitionCount, replicationFactor); - } - - @Bean - public NewTopic matchRoomCancelledTopic() { - return new NewTopic(matchRoomCancelledTopic, partitionCount, replicationFactor); - } -} \ No newline at end of file diff --git a/src/main/resources/application-local.yml b/src/main/resources/application-local.yml index 9f47f5e8..09a29df4 100644 --- a/src/main/resources/application-local.yml +++ b/src/main/resources/application-local.yml @@ -68,7 +68,9 @@ gachtaxi: match-member-joined: ${KAFKA_TOPIC_MATCH_MEMBER_JOINED} match-member-cancelled: ${KAFKA_TOPIC_MATCH_MEMBER_CANCELLED} match-room-cancelled: ${KAFKA_TOPIC_MATCH_ROOM_CANCELLED} + match-room-completed: ${KAFKA_TOPIC_MATCH_ROOM_COMPLETED} partition-count: ${KAFKA_PARTITION_COUNT} replication-factor: ${KAFKA_REPLICATION_FACTOR} - - + matching: + auto-matching-max-capacity: ${AUTO_MATCHING_MAX_CAPACITY} + auto-matching-description: ${AUTO_MATCHING_DESCRIPTION}