본문 바로가기
Research Notes (개인)

Redis Pipeline 완전 정리

4) Pipeline (RTT 감소, 처리량)

✅ Redis Pipeline이 왜 성능에 도움이 되나요?

RTT(Round Trip Time) 누적 제거가 핵심입니다.

# Pipeline 없이 (N번의 RTT)
Client → [SET k1] → Server → [OK] → Client
Client → [SET k2] → Server → [OK] → Client
Client → [SET k3] → Server → [OK] → Client
총 시간 = N × RTT + 처리시간

# Pipeline 사용 (1번의 RTT)
Client → [SET k1, SET k2, SET k3] → Server → [OK, OK, OK] → Client
총 시간 = 1 × RTT + 처리시간

네트워크 레이턴시가 1ms라도, 10,000번 호출 시:

  • 미사용: 10초 (네트워크만)
  • 사용: ~0ms (한 번에 전송)

✅ Pipeline vs Transaction 차이

항목PipelineMULTI/EXEC
목적네트워크 왕복 감소원자적 실행 보장
원자성❌ 없음✅ 있음 (제한적)
중간 끼어들기✅ 가능 (명령 사이에 다른 클라이언트 실행됨)❌ 불가
서버 측 큐잉❌ 클라이언트가 모아서 한번에 전송✅ 서버 큐에 저장
실패 시개별 명령 결과 확인 필요런타임 에러 시 부분 실패

Pipeline은 네트워크 최적화 도구, Transaction은 동시성 제어 도구

동시에 사용 가능:

// Pipeline + Transaction 동시 사용
pipeline.multi();
pipeline.set("key1", "val1");
pipeline.set("key2", "val2");
pipeline.exec();
pipeline.sync();

✅ Pipeline 사용 시 주의할 점

1. 응답 순서 보장

// 명령 순서 = 응답 순서 (순서는 보장되지만 확인해야 함)
pipeline.set("k1", "v1");  // 응답[0] = OK
pipeline.incr("counter");  // 응답[1] = (증가된 값)
pipeline.get("k1");        // 응답[2] = "v1"
List<Object> results = pipeline.syncAndReturnAll();
// results.get(1)로 각 응답 개별 처리 필요

2. 메모리 버퍼 주의

// ❌ 한 번에 너무 많이
for (int i = 0; i < 1_000_000; i++) {
    pipeline.set("key" + i, "val" + i); // 클라이언트 메모리 폭발
}

// ✅ 배치로 나눠서 처리
int BATCH_SIZE = 1000;
for (int i = 0; i < 1_000_000; i++) {
    pipeline.set("key" + i, "val" + i);
    if (i % BATCH_SIZE == 0) {
        pipeline.sync(); // 중간중간 flush
    }
}

3. 부분 실패 처리

List<Object> results = pipeline.syncAndReturnAll();
for (int i = 0; i < results.size(); i++) {
    if (results.get(i) instanceof Exception) {
        // i번째 명령 실패 → 개별 처리
        log.error("Pipeline command {} failed", i);
    }
}

✅ 대량 조회/쓰기에서 Pipeline 최적화 경험

실무 패턴 예시 (대량 캐시 워밍업):

// ❌ N+1 문제: DB에서 1만건 조회 후 1만번 Redis SET 호출
users.forEach(user -> redisTemplate.opsForValue().set("user:" + user.getId(), user));

// ✅ Pipeline으로 배치 처리
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
    users.forEach(user -> {
        byte[] key = ("user:" + user.getId()).getBytes();
        byte[] val = serialize(user);
        connection.set(key, val);
    });
    return null;
});

성능 비교 (10,000건 기준, 로컬 Redis):

방식소요 시간
개별 호출~2,000ms
Pipeline (1,000건 배치)~50ms
Pipeline (전체 한 번에)~20ms

✅ 클러스터 모드에서 Pipeline 동작

핵심 문제: 클러스터에서 키는 슬롯에 따라 여러 노드에 분산됩니다.

Node A: slot 0-5460
Node B: slot 5461-10922  
Node C: slot 10923-16383

key1 → slot 4000 → Node A
key2 → slot 8000 → Node B  ← 다른 노드!
key3 → slot 12000 → Node C ← 또 다른 노드!

클라이언트별 처리 방식:

클라이언트클러스터 Pipeline 지원
Jedis❌ 수동으로 노드별 분류 필요
Lettuce✅ 자동으로 노드별 파이프라인 분리 처리
ioredis (Node.js)✅ 자동 처리
// Lettuce (Spring Data Redis) - 클러스터 자동 처리
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
    // Lettuce가 내부적으로 슬롯별로 노드 분류 후
    // 각 노드에 병렬 Pipeline 전송
    connection.set(key1, val1); // → Node A
    connection.set(key2, val2); // → Node B (자동 라우팅)
    return null;
});

주의: 클러스터 Pipeline에서는 MGET, MSET 등 다중 키 명령 사용 불가 (또는 Hash Tag 필요)

# ❌ 클러스터에서 에러
MGET key1 key2 key3

# ✅ Hash Tag로 같은 슬롯 보장
MGET {user}:key1 {user}:key2 {user}:key3

한눈에 보는 정리

┌─────────────────────────────────────────────────────────────┐
│                    Redis 동시성 도구 비교                      │
├──────────────┬─────────────┬──────────────┬─────────────────┤
│              │  Lua Script │  MULTI/EXEC  │    Pipeline     │
├──────────────┼─────────────┼──────────────┼─────────────────┤
│ 목적         │ 원자적 로직  │ 명령 묶음     │ 네트워크 최적화  │
│ 원자성       │ ✅ 강함      │ ✅ 제한적     │ ❌             │
│ 롤백         │ ❌          │ ❌           │ ❌             │
│ 조건 분기    │ ✅          │ ❌           │ ❌             │
│ 성능         │ 중간         │ 중간          │ ✅ 매우 좋음    │
│ 복잡도       │ 높음         │ 낮음          │ 낮음            │
└──────────────┴─────────────┴──────────────┴─────────────────┘

Redis Pipeline 원리


핵심 한 줄

"여러 명령을 한 번에 묶어서 보내고, 한 번에 응답 받는다"
= RTT(Round Trip Time)를 N번에서 1번으로 줄이는 것

RTT가 뭔지부터

명령 1번 실행 시:

Client                    Redis Server
  │                            │
  │──── SET key1 "val" ───────→│
  │                            │ 처리 (μs)
  │←─────────── OK ────────────│
  │                            │
  └── 여기까지가 1 RTT ─────────┘

RTT = 명령 전송 + 처리 + 응답 수신 시간
      (네트워크가 멀수록 RTT 큼)

로컬: 0.1ms
같은 IDC: 1ms
다른 리전: 100ms+

Pipeline 없을 때 문제

명령 10,000개를 개별 실행:

Client          Redis
  │──→ SET k1 ──→│
  │←── OK ←──────│  (1 RTT)
  │──→ SET k2 ──→│
  │←── OK ←──────│  (2 RTT)
  │──→ SET k3 ──→│
  │←── OK ←──────│  (3 RTT)
  ...
  │──→ SET k10000→│
  │←── OK ←──────│  (10,000 RTT)

총 시간 = 10,000 × RTT
RTT = 1ms 가정 시 → 10초

Pipeline 사용 시

명령 10,000개를 묶어서 전송:

Client                         Redis
  │                               │
  │──→ SET k1                     │
  │──→ SET k2                     │
  │──→ SET k3    (한 번에 전송)   →│ 순서대로 처리
  │──→ ...                        │
  │──→ SET k10000                 │
  │                               │
  │←── OK, OK, OK ... (한 번에) ──│
  │                               │
  └────────── 1 RTT ──────────────┘

총 시간 = 1 × RTT + 처리시간
RTT = 1ms 가정 시 → ~0.01초

싱글 스레드 큐와의 관계

지난번에 배운 것:
  "싱글 스레드가 큐에서 실행 목록을 하나씩 처리"

Pipeline은 이 큐를 활용:

Pipeline 없이:
  [명령1] → 처리 → 응답 → [명령2] → 처리 → 응답 → ...
   (매번 네트워크 왕복 대기)

Pipeline 사용:
  [명령1, 명령2, 명령3 ... 한 번에 큐에 적재]
       ↓
  싱글 스레드가 큐에서 순서대로 처리
       ↓
  응답 한 번에 전송

→ 네트워크 대기 시간 제거
→ 싱글 스레드 처리는 동일 (그냥 더 많이 쌓임)

중요한 특성: 원자성 없음

Pipeline은 그냥 "명령을 모아서 보내는 것"
서버에서는 여전히 하나씩 순서대로 처리

Client A (Pipeline)               Client B
────────────────────────────────────────────────
SET k1 ──┐
SET k2 ──┤──→ Redis 큐에 적재
SET k3 ──┘
                   Redis 큐: [SET k1] [SET k2] [SET k3]
                                         ↑
                              [Client B: SET k2 "침입"] 끼어들기 가능!

→ k1 설정됨
→ B의 k2 설정됨  ← 끼어들었음
→ A의 k2 설정됨  ← A가 덮어씀
→ k3 설정됨

Pipeline vs MULTI/EXEC vs Lua 비교

┌──────────────┬───────────────┬──────────────┬──────────────┐
│              │   Pipeline    │  MULTI/EXEC  │  Lua Script  │
├──────────────┼───────────────┼──────────────┼──────────────┤
│ 목적         │ RTT 감소      │ 실행 순서 보장│ 완전한 원자  │
│ 원자성       │ ❌            │ ✅ (제한적)  │ ✅ (강함)   │
│ 끼어들기     │ 가능          │ EXEC 후 불가 │ 전혀 불가   │
│ 네트워크     │ 1 RTT         │ 1 RTT        │ 1 RTT       │
│ 조건 분기    │ ❌            │ ❌           │ ✅          │
└──────────────┴───────────────┴──────────────┴──────────────┘

Java 코드로 보는 차이

// ❌ Pipeline 없이 (N번 RTT)
for (int i = 0; i < 10_000; i++) {
    redisTemplate.opsForValue().set("key:" + i, "val:" + i);
}
// 10,000번 네트워크 왕복


// ✅ Pipeline 사용 (1번 RTT)
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
    for (int i = 0; i < 10_000; i++) {
        connection.set(
            ("key:" + i).getBytes(),
            ("val:" + i).getBytes()
        );
    }
    return null;
});
// 1번 네트워크 왕복

주의할 점

1. 메모리 버퍼 주의
   명령 100만개를 한 번에 → 클라이언트 메모리 폭발

   ✅ 배치로 나눠서 처리
   int BATCH = 1000;
   for (int i = 0; i < total; i++) {
       pipe.set(key, val);
       if (i % BATCH == 0) pipe.flush();  // 중간 전송
   }


2. 응답 순서 = 명령 순서
   pipe.set("k1", "v1");   → results[0] = OK
   pipe.incr("counter");   → results[1] = (숫자)
   pipe.get("k1");         → results[2] = "v1"


3. 부분 실패 처리
   results.forEach(result -> {
       if (result instanceof Exception) {
           // 이 명령만 실패, 나머지는 성공
       }
   });

최종 요약

┌─────────────────────────────────────────────────────────┐
│                  Pipeline 핵심 원리                      │
│                                                         │
│  본질: 네트워크 왕복(RTT)을 N번 → 1번으로 줄이는 것      │
│                                                         │
│  방법: 명령들을 클라이언트에서 모아서                    │
│        한 번에 전송 → 한 번에 응답 수신                  │
│                                                         │
│  싱글 스레드 큐와의 관계:                                │
│    큐에 한 번에 많이 적재 → 순서대로 처리                │
│    처리 방식은 동일, 네트워크 대기만 제거                 │
│                                                         │
│  주의:                                                   │
│    원자성 없음 (끼어들기 가능)                           │
│    원자성 필요하면 MULTI/EXEC 또는 Lua 사용              │
│                                                         │
└─────────────────────────────────────────────────────────┘

Java Redis Pipeline 예시 코드


기본 사용법

@Service
public class RedisPipelineService {

    private final StringRedisTemplate redisTemplate;

    // ============================================
    // 기본 Pipeline
    // ============================================
    public List<Object> basicPipeline() {
        return redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
            connection.set("key1".getBytes(), "value1".getBytes());
            connection.set("key2".getBytes(), "value2".getBytes());
            connection.set("key3".getBytes(), "value3".getBytes());
            return null;  // 반드시 null 반환
        });
        // 결과: [true, true, true]
    }
}

실전 예시 1: 대량 저장

// 10,000건 캐시 저장 성능 비교
@Service
public class BulkCacheService {

    private final StringRedisTemplate redisTemplate;
    private final ObjectMapper objectMapper;

    // ❌ Pipeline 없이 (N번 RTT → 느림)
    public void saveWithoutPipeline(List<Product> products) {
        long start = System.currentTimeMillis();

        products.forEach(product -> {
            redisTemplate.opsForValue().set(
                "product:" + product.getId(),
                serialize(product),
                Duration.ofMinutes(10)
            );
        });

        System.out.println("개별 저장: " + (System.currentTimeMillis() - start) + "ms");
        // 결과: 약 2000ms (10,000건 기준)
    }

    // ✅ Pipeline 사용 (1번 RTT → 빠름)
    public void saveWithPipeline(List<Product> products) {
        long start = System.currentTimeMillis();

        redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
            products.forEach(product -> {
                byte[] key = ("product:" + product.getId()).getBytes();
                byte[] val = serialize(product).getBytes();

                connection.setEx(key, 600, val);  // TTL 600초 포함
            });
            return null;
        });

        System.out.println("Pipeline 저장: " + (System.currentTimeMillis() - start) + "ms");
        // 결과: 약 50ms (10,000건 기준)
    }


    // ✅ 배치 단위 Pipeline (메모리 버퍼 관리)
    public void saveWithBatchPipeline(List<Product> products) {
        int BATCH_SIZE = 500;  // 한 번에 500개씩

        for (int i = 0; i < products.size(); i += BATCH_SIZE) {
            int end = Math.min(i + BATCH_SIZE, products.size());
            List<Product> batch = products.subList(i, end);

            redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
                batch.forEach(product -> {
                    byte[] key = ("product:" + product.getId()).getBytes();
                    byte[] val = serialize(product).getBytes();
                    connection.setEx(key, 600, val);
                });
                return null;
            });

            System.out.println((i + batch.size()) + "건 처리 완료");
        }
    }
}

실전 예시 2: 대량 조회

// 여러 키 한 번에 조회
public Map<String, String> multiGet(List<Long> productIds) {

    // ✅ Pipeline으로 대량 GET
    List<Object> results = redisTemplate.executePipelined(
        (RedisCallback<Object>) connection -> {
            productIds.forEach(id -> {
                connection.get(("product:" + id).getBytes());
            });
            return null;
        }
    );

    // 결과 매핑
    Map<String, String> resultMap = new HashMap<>();
    for (int i = 0; i < productIds.size(); i++) {
        Object result = results.get(i);
        if (result != null) {
            resultMap.put("product:" + productIds.get(i), (String) result);
        }
    }

    return resultMap;
}

실전 예시 3: 응답 순서 활용

// Pipeline 응답은 명령 순서와 동일
public void pipelineWithResults() {

    List<Object> results = redisTemplate.executePipelined(
        (RedisCallback<Object>) connection -> {
            connection.set("name".getBytes(), "Alice".getBytes());  // 명령 0
            connection.incr("visit_count".getBytes());              // 명령 1
            connection.lPush("recent".getBytes(),                   // 명령 2
                "page1".getBytes());
            connection.get("name".getBytes());                      // 명령 3
            return null;
        }
    );

    // 명령 순서 = 응답 순서 보장
    Boolean setResult   = (Boolean) results.get(0);  // SET 결과
    Long    incrResult  = (Long)    results.get(1);  // INCR 결과
    Long    pushResult  = (Long)    results.get(2);  // LPUSH 결과
    String  getResult   = (String)  results.get(3);  // GET 결과

    System.out.println("SET:   " + setResult);   // true
    System.out.println("INCR:  " + incrResult);  // 1
    System.out.println("LPUSH: " + pushResult);  // 1
    System.out.println("GET:   " + getResult);   // "Alice"
}

실전 예시 4: 부분 실패 처리

// Pipeline은 원자성 없음 → 부분 실패 처리 필요
public void handlePartialFailure(List<String> keys) {

    List<Object> results = redisTemplate.executePipelined(
        (RedisCallback<Object>) connection -> {
            keys.forEach(key -> connection.get(key.getBytes()));
            return null;
        }
    );

    List<String> successKeys = new ArrayList<>();
    List<String> failedKeys  = new ArrayList<>();

    for (int i = 0; i < results.size(); i++) {
        if (results.get(i) instanceof Exception) {
            // 이 명령만 실패
            failedKeys.add(keys.get(i));
            log.error("키 조회 실패: {}", keys.get(i));
        } else {
            successKeys.add(keys.get(i));
        }
    }

    // 실패한 키는 DB에서 재조회
    if (!failedKeys.isEmpty()) {
        loadFromDatabase(failedKeys);
    }
}

실전 예시 5: 캐시 워밍업 (실무 패턴)

// 서버 시작 시 DB 데이터를 Redis로 대량 로딩
@Component
public class CacheWarmupService {

    private final StringRedisTemplate redisTemplate;
    private final ProductRepository   productRepository;

    @EventListener(ApplicationReadyEvent.class)
    public void warmup() {
        log.info("캐시 워밍업 시작");
        long start = System.currentTimeMillis();

        // DB에서 전체 조회
        List<Product> products = productRepository.findAll();

        // Pipeline으로 일괄 캐싱
        int BATCH_SIZE = 1000;
        int total = products.size();

        for (int i = 0; i < total; i += BATCH_SIZE) {
            List<Product> batch = products.subList(
                i, Math.min(i + BATCH_SIZE, total)
            );

            redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
                batch.forEach(product -> {
                    String key = "product:" + product.getId();
                    String val = serialize(product);

                    connection.setEx(
                        key.getBytes(),
                        3600,           // 1시간 TTL
                        val.getBytes()
                    );
                });
                return null;
            });

            log.info("워밍업 진행: {}/{}", Math.min(i + BATCH_SIZE, total), total);
        }

        log.info("캐시 워밍업 완료: {}ms, {}건",
            System.currentTimeMillis() - start, total);
    }
}

성능 비교 테스트

@SpringBootTest
class PipelinePerformanceTest {

    @Autowired StringRedisTemplate redisTemplate;

    @Test
    void performanceComparison() {
        int COUNT = 10_000;

        // ─── 개별 호출 ───
        long start = System.currentTimeMillis();
        for (int i = 0; i < COUNT; i++) {
            redisTemplate.opsForValue().set("key:" + i, "val:" + i);
        }
        System.out.println("개별 호출: " + (System.currentTimeMillis() - start) + "ms");


        // ─── Pipeline (전체) ───
        start = System.currentTimeMillis();
        redisTemplate.executePipelined((RedisCallback<Object>) conn -> {
            for (int i = 0; i < COUNT; i++) {
                conn.set(("key:" + i).getBytes(), ("val:" + i).getBytes());
            }
            return null;
        });
        System.out.println("Pipeline 전체: " + (System.currentTimeMillis() - start) + "ms");


        // ─── Pipeline (배치 1000) ───
        start = System.currentTimeMillis();
        for (int i = 0; i < COUNT; i += 1000) {
            final int from = i;
            redisTemplate.executePipelined((RedisCallback<Object>) conn -> {
                for (int j = from; j < Math.min(from + 1000, COUNT); j++) {
                    conn.set(("key:" + j).getBytes(), ("val:" + j).getBytes());
                }
                return null;
            });
        }
        System.out.println("Pipeline 배치: " + (System.currentTimeMillis() - start) + "ms");
    }
}

// 결과 (10,000건, 로컬 Redis 기준):
// 개별 호출:     약 1500~2000ms
// Pipeline 전체: 약 20~30ms   ← 약 70배 빠름
// Pipeline 배치: 약 30~50ms   ← 메모리 안전

최종 요약

┌─────────────────────────────────────────────────────────┐
│               Pipeline 실무 사용 기준                    │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  ✅ 쓸 때:                                              │
│     대량 저장 / 대량 조회                                │
│     캐시 워밍업                                          │
│     원자성 불필요한 배치 작업                            │
│                                                         │
│  ⚠️ 배치 크기:                                          │
│     500 ~ 1000개 단위 권장                              │
│     너무 크면 클라이언트 메모리 문제                      │
│                                                         │
│  ❌ 쓰면 안 될 때:                                      │
│     원자성 필요 → MULTI/EXEC 또는 Lua                   │
│     명령 간 의존성 있을 때 (앞 결과로 다음 결정)         │
│                                                         │
└─────────────────────────────────────────────────────────┘