Splunk REST API로 서비스 간 병목을 추적하기
왜 웹 UI만으로는 부족했나
Splunk를 쓴다고 해서 분석이 자동으로 체계화되는 건 아니다. 웹 UI에서 즉석으로 검색하는 방식은 빠른 확인에는 좋지만, 반복 비교와 병목 추적에는 금방 한계가 온다.
| 상황 | 웹 UI | REST API |
|---|---|---|
| 같은 쿼리 반복 실행 | 사람마다 시간 범위, 필터 조건이 달라짐 | 쿼리와 파라미터를 코드로 고정 |
| 배포 전후 비교 | 탭 두 개 열어서 눈으로 비교 | 동일 쿼리, 다른 시간 범위로 자동 비교 |
| 결과 공유 | 스크린샷 + 설명 | CSV/JSON 파일 또는 재실행 가능한 스크립트 공유 |
| 주기적 모니터링 | 매번 수동 검색 | cron이나 CI에서 자동 실행 |
| 여러 환경 비교 | 환경별로 UI 접속 | 환경 파라미터만 바꿔서 실행 |
핵심은 Splunk를 더 “잘 쓰는 법”이 아니라 분석 과정 자체를 재현 가능하게 만드는 것이었다.
Splunk REST API 기본 구조
Splunk REST API로 검색하는 과정은 크게 세 단계다:
검색 작업(Job) 생성
SPL(Search Processing Language) 쿼리를 보내고 검색 작업을 시작한다.
작업 완료 대기
비동기로 실행되는 검색 작업이 끝날 때까지 polling한다.
결과 가져오기
완료된 작업의 결과를 JSON이나 CSV로 받는다.
Python으로 구현하기
REST API를 직접 호출하는 방법과, Splunk에서 제공하는 공식 SDK(splunk-sdk)를 사용하는 방법이 있다. SDK를 쓰면 인증, Job 생성, polling, 결과 파싱을 직접 구현할 필요가 없어 코드가 훨씬 간결해진다.
REST API 직접 호출
REST API를 직접 호출하면 외부 의존성 없이 requests만으로 구현할 수 있다. Job 생성 → polling → 결과 수집 3단계를 직접 관리해야 한다.
# 개념 예시 — REST API 직접 호출 방식
import requests
import time
class SplunkClient:
def __init__(self, base_url: str, token: str):
self.base_url = base_url
self.headers = {"Authorization": f"Bearer {token}"}
def search(self, query: str, earliest: str = "-1h", latest: str = "now") -> list:
job_id = self._create_job(query, earliest, latest)
self._wait_for_completion(job_id)
return self._get_results(job_id)
def _create_job(self, query: str, earliest: str, latest: str) -> str:
response = requests.post(
f"{self.base_url}/services/search/jobs",
headers=self.headers,
data={
"search": f"search {query}",
"earliest_time": earliest,
"latest_time": latest,
"output_mode": "json"
}
)
return response.json()["sid"]
def _wait_for_completion(self, job_id: str, timeout: int = 120):
start = time.time()
while time.time() - start < timeout:
response = requests.get(
f"{self.base_url}/services/search/jobs/{job_id}",
headers=self.headers,
params={"output_mode": "json"}
)
if response.json()["entry"][0]["content"]["isDone"]:
return
time.sleep(2)
raise TimeoutError(f"Job {job_id} did not complete in {timeout}s")
def _get_results(self, job_id: str) -> list:
response = requests.get(
f"{self.base_url}/services/search/jobs/{job_id}/results",
headers=self.headers,
params={"output_mode": "json", "count": 0}
)
return response.json().get("results", [])실제 활용 예시: 특정 API 응답 시간 분석
# API별 응답 시간 분포 분석
splunk = SplunkClient(base_url="https://splunk.example.com:8089", token="...")
results = splunk.search(
query="""
index=container_logs sourcetype="ecs-service-*"
uri="/api/v1/users*" method=GET
| stats count, avg(elapsed_ms) as avg_ms,
p50(elapsed_ms) as p50_ms,
p95(elapsed_ms) as p95_ms,
p99(elapsed_ms) as p99_ms,
max(elapsed_ms) as max_ms
by uri
| sort -avg_ms
""",
earliest="-24h",
latest="now"
)
for r in results:
print(f"{r['uri']}: avg={r['avg_ms']}ms, p95={r['p95_ms']}ms, p99={r['p99_ms']}ms")UI보다 프로그래밍 방식이 빠른 이유
단순히 “API 호출이 더 빠르다”가 아니다. 프로그래밍 방식의 진짜 장점은 분석 워크플로우 전체를 자동화할 수 있다는 점이다.
예시 1: 배포 전후 자동 비교
def compare_before_after(splunk: SplunkClient, deploy_time: str, api_path: str):
"""배포 전후 24시간의 응답 시간을 비교한다."""
before = splunk.search(
query=f'index=container_logs uri="{api_path}" | stats avg(elapsed_ms) as avg_ms, p95(elapsed_ms) as p95',
earliest=f"{deploy_time}-24h",
latest=deploy_time
)
after = splunk.search(
query=f'index=container_logs uri="{api_path}" | stats avg(elapsed_ms) as avg_ms, p95(elapsed_ms) as p95',
earliest=deploy_time,
latest=f"{deploy_time}+24h"
)
before_avg = float(before[0]["avg_ms"])
after_avg = float(after[0]["avg_ms"])
improvement = ((before_avg - after_avg) / before_avg) * 100
print(f"Before: avg {before_avg:.0f}ms")
print(f"After: avg {after_avg:.0f}ms")
print(f"Improvement: {improvement:.1f}%")
# 사용
compare_before_after(splunk, "2025-12-15T10:00:00", "/api/v1/users")웹 UI에서 같은 분석을 하려면 시간 범위를 수동으로 두 번 바꾸고, 결과를 각각 기록하고, 비율을 계산해야 한다. 스크립트는 이걸 한 번 실행으로 끝낸다.
예시 2: Trace ID 기반 호출 체인 추적
def trace_request_chain(splunk: SplunkClient, trace_id: str):
"""하나의 요청이 여러 서비스를 거치는 흐름을 추적한다."""
results = splunk.search(
query=f"""
index=container_logs trace_id="{trace_id}"
| sort _time
| table _time, service_name, uri, method, status, elapsed_ms
""",
earliest="-7d"
)
total_ms = 0
for step in results:
ms = float(step.get("elapsed_ms", 0))
total_ms += ms
print(f" {step['service_name']:20s} {step['method']:6s} {step['uri']:40s} {ms:>8.0f}ms")
print(f"\n Total: {total_ms:.0f}ms")
# 특정 trace의 전체 흐름을 한눈에
trace_request_chain(splunk, "abc123-def456-ghi789")예시 3: 느린 쿼리 Top 10 자동 리포트
def slow_queries_report(splunk: SplunkClient, threshold_ms: int = 5000):
"""일정 시간 이상 걸린 요청의 Top 10을 뽑는다."""
results = splunk.search(
query=f"""
index=container_logs elapsed_ms>{threshold_ms}
| stats count as slow_count,
avg(elapsed_ms) as avg_slow_ms,
max(elapsed_ms) as max_ms
by uri, method
| sort -slow_count
| head 10
""",
earliest="-24h"
)
print(f"=== Slow Queries Report (>{threshold_ms}ms, Last 24h) ===\n")
for i, r in enumerate(results, 1):
print(f" {i:2d}. {r['method']:6s} {r['uri']}")
print(f" Count: {r['slow_count']}, Avg: {r['avg_slow_ms']}ms, Max: {r['max_ms']}ms\n")실제로 어떻게 분석했나
검색 작업 생성
특정 API와 시간 구간을 기준으로 Splunk search job을 생성한다.
공통 키 기반 결과 수집
요청 흐름을 식별할 수 있는 trace ID나 사용자 맥락 키 중심으로 결과를 수집한다.
응답 시간 분포 분석
응답 시간 분포와 호출 체인을 나눠서 본다.
병목 위치 분리
한 서비스 내부 문제인지, 여러 서비스 경계를 넘는 문제인지 분리한다.
어떤 질문에 답할 수 있었나
- 특정 페이지가 느릴 때 실제로 가장 오래 걸린 구간은 어디인가
- 병목이 단일 서비스 안에 있는가, 여러 서비스 호출 체인 사이에 있는가
- 데이터 양이 늘어날수록 응답 시간이 어떤 패턴으로 커지는가
이 질문들에 답할 수 있게 되자, 성능 이슈가 우선순위를 붙일 수 있는 작업 목록으로 바뀌었다.
트레이드오프
- 로그만으로는 DB 내부 상태나 캐시 적중률까지 완전히 보이지 않는다
- 검색 스크립트가 많아질수록 쿼리 관리도 또 다른 자산이 된다
- API 기반 분석은 결국 좋은 로그 필드가 있어야만 의미가 있다
- Splunk API 인증 토큰 관리와 접근 권한 설정이 필요하다
병목 분석 도구의 가치는 대시보드가 예쁜지보다, 같은 질문을 다시 물었을 때 같은 방식으로 답할 수 있는지에 있다. Splunk REST API를 분석 과정에 묶어 넣고 나서야, 다음 단계의 최적화도 훨씬 설득력 있는 순서로 진행할 수 있었다.
다음 글에서는 이 분석 도구로 실제 28초짜리 API의 병목 원인을 분해하고, 어디부터 고쳐야 가장 큰 효과를 볼 수 있는지 개선 순서를 잡은 과정을 정리한다.