다른 명령
마이크 모듈 추가
INMP441(MS3625 모듈)을 Raspberry Pi Zero에 연결하는 핀맵입니다. 배선 (INMP441 → RPi Zero)
|INMP441 핀 |RPi Zero 핀 (물리번호) |GPIO | |----------|--------------------------------------|-------| |VDD |Pin 1 (3.3V) |- | |GND |Pin 6 (GND) |- | |L/R |Pin 6 (GND) → 좌채널 / Pin 1 (3.3V) → 우채널|- | |WS (LRCLK)|Pin 35 |GPIO 19| |SCK (BCLK)|Pin 12 |GPIO 18| |SD (DOUT) |Pin 38 |GPIO 20|
마이크 하나만 쓸 경우 L/R은 보통 GND로 묶어 좌채널로 설정합니다. 두 개를 스테레오로 쓸 거면 한쪽은 GND(좌), 다른 쪽은 3.3V(우)로 묶고 SD 핀끼리 합쳐서 GPIO 20 한 줄로 보내면 됩니다. 납땜 팁 • RPi Zero가 W(헤더 없는) 버전이면 먼저 40핀 헤더를 납땜하거나, 위 6개 홀에 직접 와이어를 연결. WH 버전이면 헤더에 듀폰 점퍼선으로 먼저 테스트 후 영구 연결 권장. • 와이어는 가능한 한 짧게(15cm 이내) — I2S 클럭이 길어지면 노이즈 탑니다. 길게 빼야 하면 차폐선 또는 트위스트 페어로. • 0.5mm 무연 솔더 + 인두 온도 320~340℃. 패드와 와이어 양쪽 다 미리 주석도금(pre-tinning) 후 접합하면 깔끔합니다. • INMP441 모듈 핀이 2.54mm 피치라 헤더핀 꽂아서 점퍼선으로 먼저 동작 확인하고, 안정되면 직결 납땜하는 게 안전합니다. • 납땜 후 멀티미터로 VDD-GND 단락 여부, 각 신호선 도통 먼저 확인하고 전원 인가. 연결 후 활성화 (/boot/config.txt)
dtparam=i2s=on dtoverlay=googlevoicehat-soundcard
또는 dtoverlay=i2s-mmap 후 arecord -l로 카드 인식 확인. arecord -D plughw:1 -c 1 -r 48000 -f S32_LE test.wav 로 녹음 테스트 하시면 됩니다.
프로그램 소스
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
“””
INMP441 I2S 마이크 → Whisper STT → Claude 응답
Raspberry Pi Zero WH + INMP441 환경 기준
“””
import os
import sys
import time
import wave
import audioop
import tempfile
import subprocess
from pathlib import Path
from openai import OpenAI
import anthropic
# ============================================================
# 환경설정
# ============================================================
OPENAI_API_KEY = os.getenv(“OPENAI_API_KEY”)
ANTHROPIC_API_KEY = os.getenv(“ANTHROPIC_API_KEY”)
# arecord 디바이스 (arecord -l 로 확인 후 카드 번호 맞추기)
ALSA_DEVICE = os.getenv(“ALSA_DEVICE”, “plughw:1,0”)
# 오디오 포맷
SAMPLE_RATE = 16000 # Whisper 권장값
CHANNELS = 1
SAMPLE_WIDTH = 2 # 16-bit PCM (plughw가 32→16 자동 변환)
# VAD (Voice Activity Detection) 파라미터
CHUNK_MS = 100 # 100ms 단위로 검사
CHUNK_BYTES = int(SAMPLE_RATE * CHUNK_MS / 1000) * SAMPLE_WIDTH
SILENCE_RMS_THRESHOLD = 600 # 환경에 맞춰 튜닝 (조용한 방 300, 시끄러우면 1000+)
START_RMS_THRESHOLD = 900 # 발화 시작 판단 임계치
SILENCE_END_SEC = 1.5 # 이만큼 무음 지속 시 발화 종료로 판단
MAX_RECORD_SEC = 30 # 최대 녹음 길이
MIN_SPEECH_SEC = 0.4 # 너무 짧은 입력은 무시
# Claude 모델
CLAUDE_MODEL = “claude-opus-4-5”
# ============================================================
# 1) I2S 마이크에서 발화 단위로 녹음
# ============================================================
def record_utterance(out_wav: str) -> bool:
“””
arecord를 파이프로 띄워 실시간으로 RMS를 측정하면서
- 발화 시작이 감지되면 버퍼링 시작
- 무음이 SILENCE_END_SEC 이상 지속되면 종료
“””
cmd = [
“arecord”,
“-D”, ALSA_DEVICE,
“-c”, str(CHANNELS),
“-r”, str(SAMPLE_RATE),
“-f”, “S16_LE”,
“-t”, “raw”,
“-q”,
]
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
```
print("🎤 듣는 중... (말씀하세요)", flush=True)
frames = []
started = False
silence_chunks = 0
silence_limit = int(SILENCE_END_SEC * 1000 / CHUNK_MS)
start_time = None
pre_buffer = [] # 발화 직전 짧은 구간도 함께 저장 (앞 음 잘림 방지)
pre_buffer_max = 3 # 약 300ms
try:
while True:
data = proc.stdout.read(CHUNK_BYTES)
if not data or len(data) < CHUNK_BYTES:
break
rms = audioop.rms(data, SAMPLE_WIDTH)
if not started:
pre_buffer.append(data)
if len(pre_buffer) > pre_buffer_max:
pre_buffer.pop(0)
if rms > START_RMS_THRESHOLD:
started = True
start_time = time.time()
frames.extend(pre_buffer)
frames.append(data)
print("🔴 녹음 시작", flush=True)
else:
frames.append(data)
if rms < SILENCE_RMS_THRESHOLD:
silence_chunks += 1
else:
silence_chunks = 0
if silence_chunks >= silence_limit:
print("⏹️ 녹음 종료", flush=True)
break
if time.time() - start_time > MAX_RECORD_SEC:
print("⏹️ 최대 길이 도달", flush=True)
break
finally:
proc.terminate()
try:
proc.wait(timeout=1)
except subprocess.TimeoutExpired:
proc.kill()
if not started:
return False
duration = (len(frames) * CHUNK_BYTES) / (SAMPLE_RATE * SAMPLE_WIDTH)
if duration < MIN_SPEECH_SEC:
return False
with wave.open(out_wav, "wb") as wf:
wf.setnchannels(CHANNELS)
wf.setsampwidth(SAMPLE_WIDTH)
wf.setframerate(SAMPLE_RATE)
wf.writeframes(b"".join(frames))
return True
```
# ============================================================
# 2) Whisper API로 STT (한국어 + 영어 코드스위칭 자동)
# ============================================================
def transcribe(audio_path: str) -> str:
client = OpenAI(api_key=OPENAI_API_KEY)
with open(audio_path, “rb”) as f:
result = client.audio.transcriptions.create(
model=“whisper-1”,
file=f,
language=“ko”, # 주 언어 힌트. 영어 단어 섞여 있으면 자동 인식
prompt=“한국어 위주이고 영어 기술 용어가 섞일 수 있습니다. “
“예: Oracle, RAC, Raspberry Pi, GPIO, INMP441.”,
temperature=0.0,
)
return result.text.strip()
# ============================================================
# 3) Claude API에 전달해서 응답 받기
# ============================================================
def ask_claude(user_text: str, history: list) -> str:
client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY)
history.append({“role”: “user”, “content”: user_text})
msg = client.messages.create(
model=CLAUDE_MODEL,
max_tokens=1024,
system=(“당신은 한국어로 답하는 음성 비서입니다. “
“사용자의 발화는 음성인식 결과라 오타·동음이의어가 있을 수 있으니 “
“맥락으로 보정해서 이해해 주세요. 간결하게 답하세요.”),
messages=history,
)
answer = msg.content[0].text
history.append({“role”: “assistant”, “content”: answer})
# 컨텍스트 비대화 방지: 최근 10턴만 유지
if len(history) > 20:
del history[: len(history) - 20]
return answer
# ============================================================
# 메인 루프
# ============================================================
def main():
if not OPENAI_API_KEY:
sys.exit(“환경변수 OPENAI_API_KEY 가 필요합니다.”)
if not ANTHROPIC_API_KEY:
sys.exit(“환경변수 ANTHROPIC_API_KEY 가 필요합니다.”)
```
print("=" * 60)
print(" INMP441 → Whisper → Claude 음성 어시스턴트")
print(f" device : {ALSA_DEVICE}")
print(f" model : {CLAUDE_MODEL}")
print(" Ctrl+C 로 종료")
print("=" * 60)
history = []
while True:
try:
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
wav_path = tmp.name
ok = record_utterance(wav_path)
if not ok:
Path(wav_path).unlink(missing_ok=True)
continue
print("🔄 STT 변환 중...", flush=True)
text = transcribe(wav_path)
Path(wav_path).unlink(missing_ok=True)
if not text:
print("(인식 실패)\n")
continue
print(f"📝 사용자: {text}", flush=True)
print("🤖 Claude 응답 생성 중...", flush=True)
answer = ask_claude(text, history)
print(f"💬 Claude: {answer}")
print("-" * 60)
except KeyboardInterrupt:
print("\n종료합니다.")
break
except Exception as e:
print(f"⚠️ 오류: {type(e).__name__}: {e}")
time.sleep(1)
```
if **name** == “**main**”:
main()