본문 바로가기
Case Studies (실무)

고QPS 조회 API Redis 캐시 도입기

PK 캐싱 + Outbox Invalidation, 그리고 DB CPU 상승으로 “무효화 자동화”를 축소한 이유 (Spring @SqsListener 기반)

요약

  • 1시간에 약 4,000회 호출되는 조회 API가 DB CPU를 지속적으로 압박하고 있었습니다.
  • Redis를 Cache-aside로 붙였고, stale 영향을 줄이기 위해 결과 전체가 아니라 “Driving table의 PK 리스트만” 캐시했습니다.
  • 정합성을 위해 Outbox → Relay(배치) → SQS → Consumer무효화(Invalidation) 를 자동화했습니다.
  • 하지만 Consumer가 무효화 판단을 위해 DB를 추가 조회하고, 무효화로 MISS가 증가하면서 heavy query 재실행이 늘어 DB CPU 상승이 관측되었습니다.
  • 운영 안정성을 우선하여 무효화 자동화 경로를 축소(비활성화/범위 축소) 하고, 최종 안전망으로 TTL 15분 + Jitter(만료 분산) 를 적용했습니다.
  • SQS 처리는 Spring @SqsListener에서 성공 시 자동 ACK(삭제) / 예외 시 재전달(재시도) 흐름으로 구성했습니다.

1) 배경: 읽기는 많고 쿼리는 무겁습니다

특정 조회 API가 1시간에 약 4,000번 호출되며, 내부적으로 조인/필터/정렬/페이지네이션이 결합된 “무거운 쿼리”가 수행되고 있었습니다. 읽기 트래픽이 안정적으로 많아 DB CPU가 점진적으로 상승했고, 피크 구간에서 더 민감하게 반응했습니다.


2) 목표와 원칙

목표

  1. DB 부하(특히 CPU) 완화
  2. Redis 장애/타임아웃이 발생해도 조회 API는 DB 폴백으로 가용성 유지
  3. stale 최소화(업무/그룹웨어 특성상 오래된 데이터는 신뢰도 하락으로 직결됩니다)

원칙

  • 캐시는 Cache-aside(있으면 쓰고, 없으면 DB)로 구현합니다.
  • 정합성 자동화(무효화/최신화)는 시도하되, 비용이 임계치를 넘으면 즉시 축소/비활성화할 수 있어야 합니다.
  • 최종 안전망으로 TTL + Jitter를 적용해 worst-case stale 및 Cache Stampede(동시 만료)를 제한합니다.
  • SQS는 at-least-once 특성상 중복 전달이 가능하므로 Consumer는 멱등해야 합니다.

3) 아키텍처 개요

diagram rendering...

4) 캐시 전략: “결과 전체”가 아니라 “PK 리스트만” 캐싱합니다

업무 시스템에서는 “결과 JSON 전체 캐싱”이 stale 리스크를 크게 만들 수 있습니다. 그래서 다음 전략을 사용했습니다.

  • 캐시에는 Driving table의 PK 리스트만 저장합니다.
  • 실제 화면에 보여줄 상세는 DB에서 다시 조회하여 최신값을 반영합니다.

주의: 이 구조에서 DB 조회가 0이 되는 것은 아닙니다. 다만 “무거운 쿼리” 실행 빈도를 줄이는 것이 목표였습니다.


5) GET API: Cache-aside + Redis 장애 폴백 + TTL(Jitter)

5.1 캐시 키(조회 파라미터 조합 + 해시)

private fun cacheKey(req: ListDocsReq): String {
    val raw = listOf(
        req.workspaceId,
        req.userId,
        req.orgId,
        req.from,
        req.to,
        req.status,
        req.sort,
        req.page,
        req.size,
        req.keyword
    ).joinToString("|") { it?.toString() ?: "" }

    return "doc:list:ws:${req.workspaceId}:${sha256(raw)}"
}

5.2 Cache-aside 예시

@Service
class DocumentQueryService(
    private val redis: StringRedisTemplate,
    private val docRepo: DocumentRepository,
) {
    fun listDocs(req: ListDocsReq): List<DocDto> {
        val key = cacheKey(req)

        val cachedIds: List<Long>? = try {
            redis.opsForValue().get(key)?.let(::decodeIds)
        } catch (_: Exception) {
            // Redis 장애여도 API는 DB로 폴백되어야 합니다.
            null
        }

        return if (!cachedIds.isNullOrEmpty()) {
            // HIT: PK 리스트로 최신 상세를 DB에서 재조회합니다.
            docRepo.findByIdsInOrder(cachedIds).map(DocDto::from)
        } else {
            // MISS: 원본 heavy query를 그대로 실행합니다.
            val rows = docRepo.findByOriginalHeavyQuery(req)

            // 응답은 rows로 주되, 캐시는 PK 리스트만 저장합니다.
            val ids = rows.map { it.id }
            try { cacheIdsAtomicallyWithTtlJitter(key, ids) } catch (_: Exception) {}

            rows.map(DocDto::from)
        }
    }
}

6) TTL 15분 + Jitter(만료 분산)로 Stampede를 완화합니다

TTL을 “정확히 15분”으로만 두면 비슷한 시점에 생성된 키들이 동시에 만료되어 Cache Stampede가 발생할 수 있습니다. 그래서 TTL에 랜덤 Jitter(예: 0~3분)를 추가해 만료 시점을 분산했습니다.

  • Base TTL: 15분(900초)
  • Jitter: 0180초(03분)
  • 최종 TTL: 900~1080초
import java.util.concurrent.ThreadLocalRandom
import org.springframework.data.redis.core.script.DefaultRedisScript

private const val BASE_TTL_SECONDS = 15 * 60
private const val TTL_JITTER_SECONDS = 3 * 60

private fun ttlWithJitterSeconds(): Int {
    val jitter = ThreadLocalRandom.current().nextInt(0, TTL_JITTER_SECONDS + 1)
    return BASE_TTL_SECONDS + jitter
}

private fun cacheIdsAtomicallyWithTtlJitter(key: String, ids: List<Long>) {
    val value = encodeIds(ids)
    val ttlSeconds = ttlWithJitterSeconds()

    // SET + EXPIRE 원자화(Lua)
    val script = """
        redis.call('SET', KEYS[1], ARGV[1])
        redis.call('EXPIRE', KEYS[1], ARGV[2])
        return 1
    """.trimIndent()

    redis.execute(
        DefaultRedisScript(script, Long::class.java),
        listOf(key),
        value,
        ttlSeconds.toString()
    )
}

7) Outbox: 쓰기 트랜잭션에서 이벤트를 기록합니다

핵심은 DML과 outbox 기록이 동일 트랜잭션에서 커밋되는 것입니다.

7.1 outbox 테이블 예시

CREATE TABLE outbox_event (
  id              BIGINT PRIMARY KEY AUTO_INCREMENT,
  aggregate_type  VARCHAR(50) NOT NULL,   -- DOCUMENT, DOCUMENT_ACL, TAG_MAP ...
  aggregate_id    BIGINT NOT NULL,
  event_type      VARCHAR(50) NOT NULL,
  role            VARCHAR(10) NOT NULL,   -- DRIVING / DRIVEN
  op              VARCHAR(10) NOT NULL,   -- INSERT / UPDATE / DELETE
  payload         JSON NULL,              -- scope/pre-image/changedColumns 등
  created_at      DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
  published_at    DATETIME NULL,
  publish_attempt INT NOT NULL DEFAULT 0,
  KEY idx_pub (published_at, created_at)
);

7.2 DML 코드 예시 (UPDATE)

@Transactional
fun updateDocument(cmd: UpdateDocumentCommand) {
    docRepo.update(cmd)

    outboxRepo.insert(
        aggregateType = "DOCUMENT",
        aggregateId = cmd.documentId,
        eventType = "DOCUMENT_UPDATED",
        role = "DRIVING",
        op = "UPDATE",
        payload = mapOf(
            "workspaceId" to cmd.workspaceId,
            "changedColumns" to listOf("updated_at", "status")
        )
    )
}

8) Relay 배치: outbox → SQS 전파 + marking

Relay는 outbox의 published_at IS NULL 이벤트를 읽어 SQS로 전파하고 published_at을 마킹합니다. 네트워크/트랜잭션 경계 때문에 “전파 중복”은 언제든 발생할 수 있으므로 Consumer 멱등 처리가 필수입니다.

또한 Consumer의 DB read를 줄이기 위해, SQS 메시지에는 가능한 범위에서 payload(scope/pre-image) 를 포함했습니다.

data class InvalidationMessage(
    val eventId: Long,
    val aggregateType: String,
    val aggregateId: Long,
    val role: String, // DRIVING/DRIVEN
    val op: String,   // INSERT/UPDATE/DELETE
    val payload: Map<String, Any?> = emptyMap()
)

@Component
class OutboxRelayJob(
    private val outboxRepo: OutboxRepository,
    private val sqs: SqsTemplate, // Spring에서 제공하는 템플릿(예시)
) {
    @Scheduled(fixedDelay = 1000)
    fun relay() {
        val events = outboxRepo.findUnpublished(limit = 200)

        for (e in events) {
            try {
                val msg = InvalidationMessage(
                    eventId = e.id,
                    aggregateType = e.aggregateType,
                    aggregateId = e.aggregateId,
                    role = e.role,
                    op = e.op,
                    payload = e.payload
                )

                sqs.send("invalidation-queue") { it.payload(msg) } // 예시
                outboxRepo.markPublished(e.id)
            } catch (_: Exception) {
                outboxRepo.incrementAttempt(e.id)
            }
        }
    }
}

실제 환경에 따라 SqsTemplate/직접 SDK/메시지 컨버터 구성은 다를 수 있습니다. 요지는 “SQS 메시지에 consumer가 필요한 payload를 넣어 outbox 재조회 비용을 줄인다”입니다.


9) Consumer: Spring @SqsListener로 처리합니다 (자동 ACK / 실패 시 재전달)

9.1 @SqsListener에서 ACK는 어디서 일어나는가?

Spring @SqsListener는 보통 다음 정책으로 동작합니다.

  • 성공(예외 없음): 프레임워크가 메시지를 삭제(DeleteMessage) → ACK
  • 실패(예외 throw): 메시지를 삭제하지 않음 → visibility timeout 이후 재전달(재시도)

아래 예시에서는 deletionPolicy = ON_SUCCESS로 명시했습니다.


10) 멱등 처리: “UNIQUE insert 선처리”만으로는 실패 유실이 생깁니다 → 상태/리스(lease)로 보완합니다

processed_event를 먼저 insert(UNIQUE)하고 처리하면 “중복 방지”는 되지만, 아래 케이스에서 이벤트가 유실될 수 있습니다.

  • processed_event insert 성공
  • invalidation 수행 전에 크래시/예외
  • 재전달된 메시지는 “이미 처리됨”으로 스킵 → 실제 invalidation이 한 번도 수행되지 않을 수 있습니다.

따라서 상태(status) + 처리 리스(lease) 를 두는 방식으로 보완했습니다.

10.1 processed_event 테이블 예시(상태/시도/리스)

CREATE TABLE processed_event (
  event_id          BIGINT PRIMARY KEY,
  status            VARCHAR(20) NOT NULL, -- PROCESSING / PROCESSED / FAILED
  attempt           INT NOT NULL DEFAULT 0,
  last_error        TEXT NULL,
  processing_until  DATETIME NULL,        -- lease
  created_at        DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
  updated_at        DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

10.2 처리 알고리즘(요지)

  • 이벤트 수신 시 tryStart(eventId)
    • 없으면 INSERT (PROCESSING, attempt=1, processing_until=now+lease)
    • 있으면
      • PROCESSED면 스킵
      • FAILED면 재시도 허용(정책에 따라)
      • PROCESSING인데 lease 만료면 takeover 허용
  • 성공하면 PROCESSED로 업데이트
  • 실패하면 FAILED로 업데이트 후 예외를 throw하여 SQS 재전달을 유도합니다(ACK되지 않음)

11) @SqsListener Consumer 예시 코드 (ON_SUCCESS 자동 ACK)

import io.awspring.cloud.sqs.annotation.SqsListener
import io.awspring.cloud.sqs.annotation.SqsMessageDeletionPolicy

@Component
class CacheInvalidationConsumer(
    private val processedRepo: ProcessedEventRepository,
    private val scopeRepo: InvalidationScopeRepository,
    private val redis: StringRedisTemplate,
) {

    @SqsListener(
        value = ["\${app.sqs.invalidation-queue}"],
        deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS
    )
    fun onMessage(msg: InvalidationMessage) {
        // 1) 멱등/리스 기반으로 처리 시작
        val start = processedRepo.tryStart(msg.eventId, leaseSeconds = 30)
        if (start == StartResult.ALREADY_PROCESSED) return
        if (start == StartResult.OTHER_WORKER_PROCESSING) return

        try {
            // 2) invalidation 수행
            handleInvalidation(msg)

            // 3) 성공 마킹
            processedRepo.markProcessed(msg.eventId)

            // ✅ 예외 없이 리턴하면 Spring이 메시지를 삭제(ACK)합니다.
        } catch (e: Exception) {
            // 4) 실패 마킹 후 예외 throw
            processedRepo.markFailed(msg.eventId, e)

            // ❌ 예외를 던지면 Spring이 메시지를 삭제하지 않습니다(ACK 안 됨).
            // -> visibility timeout 이후 재전달(재시도)
            throw e
        }
    }

    private fun handleInvalidation(msg: InvalidationMessage) {
        // Driving/Driven + DML 종류에 따라 분기
        if (msg.role == "DRIVING") {
            handleDriving(msg)
        } else {
            handleDriven(msg)
        }
    }

    private fun handleDriving(msg: InvalidationMessage) {
        // 문서가 driving인 케이스 예시
        // DELETE면 DB에서 못 읽을 수 있으니 payload pre-image 우선 사용
        val workspaceIdFromPayload = (msg.payload["workspaceId"] as? Number)?.toLong()

        val scope = scopeRepo.findScopeByAggregate(msg) // 내부에서 DB 조회할 수도 있고, payload만으로도 처리 가능하게 설계
            ?: run {
                // 최소한 workspace 단위로 무효화(보수적)
                workspaceIdFromPayload?.let { invalidateWorkspace(it) }
                return
            }

        when (msg.op) {
            "INSERT", "DELETE" -> invalidateListScopes(scope)
            "UPDATE" -> {
                val changed = (msg.payload["changedColumns"] as? List<*>)?.filterIsInstance<String>().orEmpty()
                val affectsList = changed.any { it in setOf("deleted","status","updated_at","workspace_id","owner_user_id") }
                if (affectsList) invalidateListScopes(scope)
            }
        }
    }

    private fun handleDriven(msg: InvalidationMessage) {
        when (msg.aggregateType) {
            "DOCUMENT_ACL" -> {
                // ACL 변경은 멤버십 변경 → 영향 사용자 scope만 무효화(가능하면)
                val docId = (msg.payload["documentId"] as? Number)?.toLong() ?: return
                val scope = scopeRepo.findScopeByDocumentId(docId) ?: return
                invalidateUserScopedLists(scope.workspaceId, scope.affectedUserIds)
            }
            "USER" -> {
                // 표시값 변경은 ID 집합 불변인 경우가 많습니다.
                // 단, 쿼리에서 user 컬럼이 WHERE/ORDER BY에 쓰인다면 예외가 될 수 있습니다.
                return
            }
            else -> {
                // 알 수 없으면 보수적으로 workspace 단위 무효화
                val ws = (msg.payload["workspaceId"] as? Number)?.toLong() ?: return
                invalidateWorkspace(ws)
            }
        }
    }

    private fun invalidateListScopes(scope: InvalidationScope) {
        deleteByScan("doc:list:ws:${scope.workspaceId}:*")
        scope.organizationId?.let { deleteByScan("doc:list:ws:${scope.workspaceId}:org:$it:*") }
        invalidateUserScopedLists(scope.workspaceId, scope.affectedUserIds)
    }

    private fun invalidateUserScopedLists(workspaceId: Long, userIds: List<Long>) {
        userIds.forEach { deleteByScan("doc:list:ws:$workspaceId:user:$it:*") }
    }

    private fun invalidateWorkspace(workspaceId: Long) {
        deleteByScan("doc:list:ws:$workspaceId:*")
    }

    private fun deleteByScan(pattern: String) {
        val conn = redis.connectionFactory?.connection ?: return
        try {
            val options = ScanOptions.scanOptions().match(pattern).count(1000).build()
            conn.scan(options).use { cursor ->
                val batch = ArrayList<ByteArray>(500)
                while (cursor.hasNext()) {
                    batch.add(cursor.next())
                    if (batch.size >= 500) {
                        conn.del(*batch.toTypedArray())
                        batch.clear()
                    }
                }
                if (batch.isNotEmpty()) conn.del(*batch.toTypedArray())
            }
        } finally {
            conn.close()
        }
    }
}

중요한 운영 포인트

  • 예외를 잡고 삼켜버리면(throw 안 하면) ON_SUCCESS 정책에서 메시지가 삭제(ACK) 될 수 있습니다.
  • 실패 시 재시도를 원하시면, 실패를 기록한 뒤 반드시 예외를 throw하여 ACK되지 않도록 해야 합니다.
  • 무한 재시도는 DLQ(redrive policy) 로 제어하는 것이 일반적입니다.

12) Driving/Driven + DML에 따른 invalidation 전략(핵심 기준)

핵심 기준은 한 문장으로 정리됩니다.

이 변경이 리스트의 ID 집합(멤버십/순서) 에 영향을 주는가요?

  • Driving 변경: 대체로 멤버십/순서에 영향 → invalidation 빈도가 높습니다.
  • Driven 변경: 쿼리에서 해당 테이블/컬럼이 WHERE/EXISTS/ORDER BY에 관여하는지에 따라 다릅니다.
    • 단순 표시값(SELECT 컬럼만) 변경은 리스트 invalidation을 생략할 수 있습니다.
    • ACL/태그처럼 멤버십이 바뀌는 driven은 invalidation이 필요합니다.

13) 왜 DB CPU가 상승했나요? (무효화 자동화 축소의 이유)

캐시는 읽기 부하를 줄이기 위한 것이었지만, 실제로는 아래 요인이 복합으로 겹치며 DB CPU가 상승했습니다.

  1. MISS에서는 heavy query가 그대로 실행됩니다

    • 이번 구현은 2-step(ID-only)로 쿼리를 재설계하지 않았습니다.
  2. 무효화가 잦아지면 MISS 비율이 올라갑니다

    • 캐시가 자주 비워지면 heavy query 재실행이 증가합니다.
  3. Consumer가 무효화 판단을 위해 DB를 추가 조회했습니다(Write Amplification)

    • 이벤트 1건 처리에 scope 계산용 DB 조회(document/ACL/관계)가 추가되며 비용이 증폭되었습니다.

결국 운영 안정성 관점에서 “무효화 자동화”가 새로운 병목이 될 가능성이 커져, 무효화 자동화 경로를 축소(비활성화/범위 축소) 하고 TTL 안전망을 강화했습니다.


14) 최종 안전망: TTL 15분 + Jitter 유지

무효화 자동화를 축소하더라도 stale를 무한정 허용할 수는 없었습니다. 그래서 최종 안전망으로 TTL 15분 + Jitter를 유지하여 다음을 달성했습니다.

  • worst-case stale 시간을 제한합니다.
  • 동시 만료(Cache Stampede) 리스크를 줄입니다.
  • Redis 장애 시에도 DB 폴백으로 조회 가용성을 유지합니다.

15) 회고: 캐시는 ‘성능 기능’이 아니라 ‘운영 기능’입니다

이번 적용을 통해 다음을 다시 확인했습니다.

  • Cache Invalidation은 생각보다 복잡도를 크게 증가시킵니다. 따라서 최신화가 중요한 영역에서는 가능한 캐시를 사용하지 않는 것이 낫다고 생각이 들었습니다.
  • 캐시는 단순히 빠르게 만드는 기능이 아니라, 정합성·관측·롤백 전략이 함께 포함된 운영 기능입니다.
  • 특히 PK 리스트 캐시 + DB 재조회 구조에서는 invalidation이 잦아지면 MISS가 늘고, heavy query 재실행으로 DB 부하가 다시 올라갈 수 있습니다.
  • 또한 consumer가 invalidation 판단을 위해 DB를 재조회하면, 쓰기 이벤트가 읽기를 증폭시키는 구조(Write Amplification)가 되어 비용이 커질 수 있습니다.
  • 그래서 운영 안정성을 최우선으로 보고 Outbox Invalidation을 롤백하고, 최신화가 필수적이지 않은 부분만 TTL 15분 + Jitter를 최종 안전망으로 적용했습니다.

부록) 운영에서 체크한 지표(예시)

  • DB CPU / Load, 쿼리 p95/p99, Slow Query 비율
  • Redis hit ratio, 키 수/메모리 사용량, 커맨드 latency
  • Outbox lag(미발행 이벤트 적체), Relay 처리량/실패율
  • SQS backlog(대기 메시지), Consumer 처리량/실패율, DLQ 유입률
  • TTL 만료 구간에서 DB 스파이크 여부(Stampede/Herd 징후)