Redis Pipeline 완전 정리
4) Pipeline (RTT 감소, 처리량)
✅ Redis Pipeline이 왜 성능에 도움이 되나요?
RTT(Round Trip Time) 누적 제거가 핵심입니다.
# Pipeline 없이 (N번의 RTT)
Client → [SET k1] → Server → [OK] → Client
Client → [SET k2] → Server → [OK] → Client
Client → [SET k3] → Server → [OK] → Client
총 시간 = N × RTT + 처리시간
# Pipeline 사용 (1번의 RTT)
Client → [SET k1, SET k2, SET k3] → Server → [OK, OK, OK] → Client
총 시간 = 1 × RTT + 처리시간
네트워크 레이턴시가 1ms라도, 10,000번 호출 시:
- 미사용: 10초 (네트워크만)
- 사용: ~0ms (한 번에 전송)
✅ Pipeline vs Transaction 차이
| 항목 | Pipeline | MULTI/EXEC |
|---|---|---|
| 목적 | 네트워크 왕복 감소 | 원자적 실행 보장 |
| 원자성 | ❌ 없음 | ✅ 있음 (제한적) |
| 중간 끼어들기 | ✅ 가능 (명령 사이에 다른 클라이언트 실행됨) | ❌ 불가 |
| 서버 측 큐잉 | ❌ 클라이언트가 모아서 한번에 전송 | ✅ 서버 큐에 저장 |
| 실패 시 | 개별 명령 결과 확인 필요 | 런타임 에러 시 부분 실패 |
Pipeline은 네트워크 최적화 도구, Transaction은 동시성 제어 도구
동시에 사용 가능:
// Pipeline + Transaction 동시 사용
pipeline.multi();
pipeline.set("key1", "val1");
pipeline.set("key2", "val2");
pipeline.exec();
pipeline.sync();
✅ Pipeline 사용 시 주의할 점
1. 응답 순서 보장
// 명령 순서 = 응답 순서 (순서는 보장되지만 확인해야 함)
pipeline.set("k1", "v1"); // 응답[0] = OK
pipeline.incr("counter"); // 응답[1] = (증가된 값)
pipeline.get("k1"); // 응답[2] = "v1"
List<Object> results = pipeline.syncAndReturnAll();
// results.get(1)로 각 응답 개별 처리 필요
2. 메모리 버퍼 주의
// ❌ 한 번에 너무 많이
for (int i = 0; i < 1_000_000; i++) {
pipeline.set("key" + i, "val" + i); // 클라이언트 메모리 폭발
}
// ✅ 배치로 나눠서 처리
int BATCH_SIZE = 1000;
for (int i = 0; i < 1_000_000; i++) {
pipeline.set("key" + i, "val" + i);
if (i % BATCH_SIZE == 0) {
pipeline.sync(); // 중간중간 flush
}
}
3. 부분 실패 처리
List<Object> results = pipeline.syncAndReturnAll();
for (int i = 0; i < results.size(); i++) {
if (results.get(i) instanceof Exception) {
// i번째 명령 실패 → 개별 처리
log.error("Pipeline command {} failed", i);
}
}
✅ 대량 조회/쓰기에서 Pipeline 최적화 경험
실무 패턴 예시 (대량 캐시 워밍업):
// ❌ N+1 문제: DB에서 1만건 조회 후 1만번 Redis SET 호출
users.forEach(user -> redisTemplate.opsForValue().set("user:" + user.getId(), user));
// ✅ Pipeline으로 배치 처리
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
users.forEach(user -> {
byte[] key = ("user:" + user.getId()).getBytes();
byte[] val = serialize(user);
connection.set(key, val);
});
return null;
});
성능 비교 (10,000건 기준, 로컬 Redis):
| 방식 | 소요 시간 |
|---|---|
| 개별 호출 | ~2,000ms |
| Pipeline (1,000건 배치) | ~50ms |
| Pipeline (전체 한 번에) | ~20ms |
✅ 클러스터 모드에서 Pipeline 동작
핵심 문제: 클러스터에서 키는 슬롯에 따라 여러 노드에 분산됩니다.
Node A: slot 0-5460
Node B: slot 5461-10922
Node C: slot 10923-16383
key1 → slot 4000 → Node A
key2 → slot 8000 → Node B ← 다른 노드!
key3 → slot 12000 → Node C ← 또 다른 노드!
클라이언트별 처리 방식:
| 클라이언트 | 클러스터 Pipeline 지원 |
|---|---|
| Jedis | ❌ 수동으로 노드별 분류 필요 |
| Lettuce | ✅ 자동으로 노드별 파이프라인 분리 처리 |
| ioredis (Node.js) | ✅ 자동 처리 |
// Lettuce (Spring Data Redis) - 클러스터 자동 처리
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
// Lettuce가 내부적으로 슬롯별로 노드 분류 후
// 각 노드에 병렬 Pipeline 전송
connection.set(key1, val1); // → Node A
connection.set(key2, val2); // → Node B (자동 라우팅)
return null;
});
주의: 클러스터 Pipeline에서는 MGET, MSET 등 다중 키 명령 사용 불가 (또는 Hash Tag 필요)
# ❌ 클러스터에서 에러
MGET key1 key2 key3
# ✅ Hash Tag로 같은 슬롯 보장
MGET {user}:key1 {user}:key2 {user}:key3
한눈에 보는 정리
┌─────────────────────────────────────────────────────────────┐
│ Redis 동시성 도구 비교 │
├──────────────┬─────────────┬──────────────┬─────────────────┤
│ │ Lua Script │ MULTI/EXEC │ Pipeline │
├──────────────┼─────────────┼──────────────┼─────────────────┤
│ 목적 │ 원자적 로직 │ 명령 묶음 │ 네트워크 최적화 │
│ 원자성 │ ✅ 강함 │ ✅ 제한적 │ ❌ │
│ 롤백 │ ❌ │ ❌ │ ❌ │
│ 조건 분기 │ ✅ │ ❌ │ ❌ │
│ 성능 │ 중간 │ 중간 │ ✅ 매우 좋음 │
│ 복잡도 │ 높음 │ 낮음 │ 낮음 │
└──────────────┴─────────────┴──────────────┴─────────────────┘
Redis Pipeline 원리
핵심 한 줄
"여러 명령을 한 번에 묶어서 보내고, 한 번에 응답 받는다"
= RTT(Round Trip Time)를 N번에서 1번으로 줄이는 것
RTT가 뭔지부터
명령 1번 실행 시:
Client Redis Server
│ │
│──── SET key1 "val" ───────→│
│ │ 처리 (μs)
│←─────────── OK ────────────│
│ │
└── 여기까지가 1 RTT ─────────┘
RTT = 명령 전송 + 처리 + 응답 수신 시간
(네트워크가 멀수록 RTT 큼)
로컬: 0.1ms
같은 IDC: 1ms
다른 리전: 100ms+
Pipeline 없을 때 문제
명령 10,000개를 개별 실행:
Client Redis
│──→ SET k1 ──→│
│←── OK ←──────│ (1 RTT)
│──→ SET k2 ──→│
│←── OK ←──────│ (2 RTT)
│──→ SET k3 ──→│
│←── OK ←──────│ (3 RTT)
...
│──→ SET k10000→│
│←── OK ←──────│ (10,000 RTT)
총 시간 = 10,000 × RTT
RTT = 1ms 가정 시 → 10초
Pipeline 사용 시
명령 10,000개를 묶어서 전송:
Client Redis
│ │
│──→ SET k1 │
│──→ SET k2 │
│──→ SET k3 (한 번에 전송) →│ 순서대로 처리
│──→ ... │
│──→ SET k10000 │
│ │
│←── OK, OK, OK ... (한 번에) ──│
│ │
└────────── 1 RTT ──────────────┘
총 시간 = 1 × RTT + 처리시간
RTT = 1ms 가정 시 → ~0.01초
싱글 스레드 큐와의 관계
지난번에 배운 것:
"싱글 스레드가 큐에서 실행 목록을 하나씩 처리"
Pipeline은 이 큐를 활용:
Pipeline 없이:
[명령1] → 처리 → 응답 → [명령2] → 처리 → 응답 → ...
(매번 네트워크 왕복 대기)
Pipeline 사용:
[명령1, 명령2, 명령3 ... 한 번에 큐에 적재]
↓
싱글 스레드가 큐에서 순서대로 처리
↓
응답 한 번에 전송
→ 네트워크 대기 시간 제거
→ 싱글 스레드 처리는 동일 (그냥 더 많이 쌓임)
중요한 특성: 원자성 없음
Pipeline은 그냥 "명령을 모아서 보내는 것"
서버에서는 여전히 하나씩 순서대로 처리
Client A (Pipeline) Client B
────────────────────────────────────────────────
SET k1 ──┐
SET k2 ──┤──→ Redis 큐에 적재
SET k3 ──┘
Redis 큐: [SET k1] [SET k2] [SET k3]
↑
[Client B: SET k2 "침입"] 끼어들기 가능!
→ k1 설정됨
→ B의 k2 설정됨 ← 끼어들었음
→ A의 k2 설정됨 ← A가 덮어씀
→ k3 설정됨
Pipeline vs MULTI/EXEC vs Lua 비교
┌──────────────┬───────────────┬──────────────┬──────────────┐
│ │ Pipeline │ MULTI/EXEC │ Lua Script │
├──────────────┼───────────────┼──────────────┼──────────────┤
│ 목적 │ RTT 감소 │ 실행 순서 보장│ 완전한 원자 │
│ 원자성 │ ❌ │ ✅ (제한적) │ ✅ (강함) │
│ 끼어들기 │ 가능 │ EXEC 후 불가 │ 전혀 불가 │
│ 네트워크 │ 1 RTT │ 1 RTT │ 1 RTT │
│ 조건 분기 │ ❌ │ ❌ │ ✅ │
└──────────────┴───────────────┴──────────────┴──────────────┘
Java 코드로 보는 차이
// ❌ Pipeline 없이 (N번 RTT)
for (int i = 0; i < 10_000; i++) {
redisTemplate.opsForValue().set("key:" + i, "val:" + i);
}
// 10,000번 네트워크 왕복
// ✅ Pipeline 사용 (1번 RTT)
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
for (int i = 0; i < 10_000; i++) {
connection.set(
("key:" + i).getBytes(),
("val:" + i).getBytes()
);
}
return null;
});
// 1번 네트워크 왕복
주의할 점
1. 메모리 버퍼 주의
명령 100만개를 한 번에 → 클라이언트 메모리 폭발
✅ 배치로 나눠서 처리
int BATCH = 1000;
for (int i = 0; i < total; i++) {
pipe.set(key, val);
if (i % BATCH == 0) pipe.flush(); // 중간 전송
}
2. 응답 순서 = 명령 순서
pipe.set("k1", "v1"); → results[0] = OK
pipe.incr("counter"); → results[1] = (숫자)
pipe.get("k1"); → results[2] = "v1"
3. 부분 실패 처리
results.forEach(result -> {
if (result instanceof Exception) {
// 이 명령만 실패, 나머지는 성공
}
});
최종 요약
┌─────────────────────────────────────────────────────────┐
│ Pipeline 핵심 원리 │
│ │
│ 본질: 네트워크 왕복(RTT)을 N번 → 1번으로 줄이는 것 │
│ │
│ 방법: 명령들을 클라이언트에서 모아서 │
│ 한 번에 전송 → 한 번에 응답 수신 │
│ │
│ 싱글 스레드 큐와의 관계: │
│ 큐에 한 번에 많이 적재 → 순서대로 처리 │
│ 처리 방식은 동일, 네트워크 대기만 제거 │
│ │
│ 주의: │
│ 원자성 없음 (끼어들기 가능) │
│ 원자성 필요하면 MULTI/EXEC 또는 Lua 사용 │
│ │
└─────────────────────────────────────────────────────────┘
Java Redis Pipeline 예시 코드
기본 사용법
@Service
public class RedisPipelineService {
private final StringRedisTemplate redisTemplate;
// ============================================
// 기본 Pipeline
// ============================================
public List<Object> basicPipeline() {
return redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
connection.set("key1".getBytes(), "value1".getBytes());
connection.set("key2".getBytes(), "value2".getBytes());
connection.set("key3".getBytes(), "value3".getBytes());
return null; // 반드시 null 반환
});
// 결과: [true, true, true]
}
}
실전 예시 1: 대량 저장
// 10,000건 캐시 저장 성능 비교
@Service
public class BulkCacheService {
private final StringRedisTemplate redisTemplate;
private final ObjectMapper objectMapper;
// ❌ Pipeline 없이 (N번 RTT → 느림)
public void saveWithoutPipeline(List<Product> products) {
long start = System.currentTimeMillis();
products.forEach(product -> {
redisTemplate.opsForValue().set(
"product:" + product.getId(),
serialize(product),
Duration.ofMinutes(10)
);
});
System.out.println("개별 저장: " + (System.currentTimeMillis() - start) + "ms");
// 결과: 약 2000ms (10,000건 기준)
}
// ✅ Pipeline 사용 (1번 RTT → 빠름)
public void saveWithPipeline(List<Product> products) {
long start = System.currentTimeMillis();
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
products.forEach(product -> {
byte[] key = ("product:" + product.getId()).getBytes();
byte[] val = serialize(product).getBytes();
connection.setEx(key, 600, val); // TTL 600초 포함
});
return null;
});
System.out.println("Pipeline 저장: " + (System.currentTimeMillis() - start) + "ms");
// 결과: 약 50ms (10,000건 기준)
}
// ✅ 배치 단위 Pipeline (메모리 버퍼 관리)
public void saveWithBatchPipeline(List<Product> products) {
int BATCH_SIZE = 500; // 한 번에 500개씩
for (int i = 0; i < products.size(); i += BATCH_SIZE) {
int end = Math.min(i + BATCH_SIZE, products.size());
List<Product> batch = products.subList(i, end);
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
batch.forEach(product -> {
byte[] key = ("product:" + product.getId()).getBytes();
byte[] val = serialize(product).getBytes();
connection.setEx(key, 600, val);
});
return null;
});
System.out.println((i + batch.size()) + "건 처리 완료");
}
}
}
실전 예시 2: 대량 조회
// 여러 키 한 번에 조회
public Map<String, String> multiGet(List<Long> productIds) {
// ✅ Pipeline으로 대량 GET
List<Object> results = redisTemplate.executePipelined(
(RedisCallback<Object>) connection -> {
productIds.forEach(id -> {
connection.get(("product:" + id).getBytes());
});
return null;
}
);
// 결과 매핑
Map<String, String> resultMap = new HashMap<>();
for (int i = 0; i < productIds.size(); i++) {
Object result = results.get(i);
if (result != null) {
resultMap.put("product:" + productIds.get(i), (String) result);
}
}
return resultMap;
}
실전 예시 3: 응답 순서 활용
// Pipeline 응답은 명령 순서와 동일
public void pipelineWithResults() {
List<Object> results = redisTemplate.executePipelined(
(RedisCallback<Object>) connection -> {
connection.set("name".getBytes(), "Alice".getBytes()); // 명령 0
connection.incr("visit_count".getBytes()); // 명령 1
connection.lPush("recent".getBytes(), // 명령 2
"page1".getBytes());
connection.get("name".getBytes()); // 명령 3
return null;
}
);
// 명령 순서 = 응답 순서 보장
Boolean setResult = (Boolean) results.get(0); // SET 결과
Long incrResult = (Long) results.get(1); // INCR 결과
Long pushResult = (Long) results.get(2); // LPUSH 결과
String getResult = (String) results.get(3); // GET 결과
System.out.println("SET: " + setResult); // true
System.out.println("INCR: " + incrResult); // 1
System.out.println("LPUSH: " + pushResult); // 1
System.out.println("GET: " + getResult); // "Alice"
}
실전 예시 4: 부분 실패 처리
// Pipeline은 원자성 없음 → 부분 실패 처리 필요
public void handlePartialFailure(List<String> keys) {
List<Object> results = redisTemplate.executePipelined(
(RedisCallback<Object>) connection -> {
keys.forEach(key -> connection.get(key.getBytes()));
return null;
}
);
List<String> successKeys = new ArrayList<>();
List<String> failedKeys = new ArrayList<>();
for (int i = 0; i < results.size(); i++) {
if (results.get(i) instanceof Exception) {
// 이 명령만 실패
failedKeys.add(keys.get(i));
log.error("키 조회 실패: {}", keys.get(i));
} else {
successKeys.add(keys.get(i));
}
}
// 실패한 키는 DB에서 재조회
if (!failedKeys.isEmpty()) {
loadFromDatabase(failedKeys);
}
}
실전 예시 5: 캐시 워밍업 (실무 패턴)
// 서버 시작 시 DB 데이터를 Redis로 대량 로딩
@Component
public class CacheWarmupService {
private final StringRedisTemplate redisTemplate;
private final ProductRepository productRepository;
@EventListener(ApplicationReadyEvent.class)
public void warmup() {
log.info("캐시 워밍업 시작");
long start = System.currentTimeMillis();
// DB에서 전체 조회
List<Product> products = productRepository.findAll();
// Pipeline으로 일괄 캐싱
int BATCH_SIZE = 1000;
int total = products.size();
for (int i = 0; i < total; i += BATCH_SIZE) {
List<Product> batch = products.subList(
i, Math.min(i + BATCH_SIZE, total)
);
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
batch.forEach(product -> {
String key = "product:" + product.getId();
String val = serialize(product);
connection.setEx(
key.getBytes(),
3600, // 1시간 TTL
val.getBytes()
);
});
return null;
});
log.info("워밍업 진행: {}/{}", Math.min(i + BATCH_SIZE, total), total);
}
log.info("캐시 워밍업 완료: {}ms, {}건",
System.currentTimeMillis() - start, total);
}
}
성능 비교 테스트
@SpringBootTest
class PipelinePerformanceTest {
@Autowired StringRedisTemplate redisTemplate;
@Test
void performanceComparison() {
int COUNT = 10_000;
// ─── 개별 호출 ───
long start = System.currentTimeMillis();
for (int i = 0; i < COUNT; i++) {
redisTemplate.opsForValue().set("key:" + i, "val:" + i);
}
System.out.println("개별 호출: " + (System.currentTimeMillis() - start) + "ms");
// ─── Pipeline (전체) ───
start = System.currentTimeMillis();
redisTemplate.executePipelined((RedisCallback<Object>) conn -> {
for (int i = 0; i < COUNT; i++) {
conn.set(("key:" + i).getBytes(), ("val:" + i).getBytes());
}
return null;
});
System.out.println("Pipeline 전체: " + (System.currentTimeMillis() - start) + "ms");
// ─── Pipeline (배치 1000) ───
start = System.currentTimeMillis();
for (int i = 0; i < COUNT; i += 1000) {
final int from = i;
redisTemplate.executePipelined((RedisCallback<Object>) conn -> {
for (int j = from; j < Math.min(from + 1000, COUNT); j++) {
conn.set(("key:" + j).getBytes(), ("val:" + j).getBytes());
}
return null;
});
}
System.out.println("Pipeline 배치: " + (System.currentTimeMillis() - start) + "ms");
}
}
// 결과 (10,000건, 로컬 Redis 기준):
// 개별 호출: 약 1500~2000ms
// Pipeline 전체: 약 20~30ms ← 약 70배 빠름
// Pipeline 배치: 약 30~50ms ← 메모리 안전
최종 요약
┌─────────────────────────────────────────────────────────┐
│ Pipeline 실무 사용 기준 │
├─────────────────────────────────────────────────────────┤
│ │
│ ✅ 쓸 때: │
│ 대량 저장 / 대량 조회 │
│ 캐시 워밍업 │
│ 원자성 불필요한 배치 작업 │
│ │
│ ⚠️ 배치 크기: │
│ 500 ~ 1000개 단위 권장 │
│ 너무 크면 클라이언트 메모리 문제 │
│ │
│ ❌ 쓰면 안 될 때: │
│ 원자성 필요 → MULTI/EXEC 또는 Lua │
│ 명령 간 의존성 있을 때 (앞 결과로 다음 결정) │
│ │
└─────────────────────────────────────────────────────────┘