import asyncio
import json
import logging
import cv2
import numpy as np
import websockets
import pyaudio
import os
import sys
from aiortc import RTCPeerConnection, RTCSessionDescription, VideoStreamTrack, RTCIceCandidate, MediaStreamTrack
from av import VideoFrame
from av.audio.frame import AudioFrame
import fractions
# Suppress ALSA warnings
class ALSAErrorSuppress:
def __enter__(self):
self.original_stderr = sys.stderr
sys.stderr = open(os.devnull, 'w')
return self
def __exit__(self, exc_type, exc_val, exc_tb):
sys.stderr.close()
sys.stderr = self.original_stderr
# Configuration
CAMERA_DEVICE = 0 # Change if your camera is on different device
CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480
CAMERA_FPS = 30
WEBSOCKET_PORT = 8765
# Audio settings
AUDIO_SAMPLE_RATE = 48000
AUDIO_CHUNK = 960 # 20ms at 48kHz
AUDIO_CHANNELS = 1
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("pi-streamer")
class CameraStreamTrack(VideoStreamTrack):
def __init__(self):
super().__init__()
self.cap = None
self.frame_count = 0
self._init_camera()
def _init_camera(self):
try:
self.cap = cv2.VideoCapture(CAMERA_DEVICE)
if self.cap.isOpened():
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, CAMERA_WIDTH)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, CAMERA_HEIGHT)
self.cap.set(cv2.CAP_PROP_FPS, CAMERA_FPS)
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
logger.info("✅ Camera ready")
else:
logger.warning("⚠️ Camera not available, using test pattern")
self.cap = None
except Exception as e:
logger.error(f"Camera error: {e}")
self.cap = None
async def recv(self):
pts, time_base = await self.next_timestamp()
if self.cap and self.cap.isOpened():
ret, frame = self.cap.read()
if ret:
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
else:
frame = self._test_pattern()
else:
frame = self._test_pattern()
video_frame = VideoFrame.from_ndarray(frame, format="rgb24")
video_frame.pts = pts
video_frame.time_base = time_base
return video_frame
def _test_pattern(self):
"""Generate test pattern if camera fails"""
self.frame_count += 1
frame = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3), dtype=np.uint8)
# Gradient background
for y in range(CAMERA_HEIGHT):
frame[y, :] = [y//2, 100, 200 - y//3]
# Moving circle
x = int((self.frame_count * 3) % CAMERA_WIDTH)
y = int(CAMERA_HEIGHT // 2 + 50 * np.sin(self.frame_count * 0.1))
cv2.circle(frame, (x, y), 20, (255, 255, 0), -1)
# Text overlay
cv2.putText(frame, "LIVE TEST PATTERN", (20, 40),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
cv2.putText(frame, f"Frame: {self.frame_count}", (20, 80),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
return frame
def stop(self):
if self.cap:
self.cap.release()
logger.info("📹 Camera released")
class MicrophoneStreamTrack(MediaStreamTrack):
kind = "audio"
def __init__(self):
super().__init__()
self.sample_rate = AUDIO_SAMPLE_RATE
self.chunk = AUDIO_CHUNK
self.channels = AUDIO_CHANNELS
self.pts = 0
self.time_base = fractions.Fraction(1, self.sample_rate)
self.p = None
self.stream = None
self.running = True
self._init_audio()
def _init_audio(self):
try:
with ALSAErrorSuppress():
self.p = pyaudio.PyAudio()
input_device = self._find_input_device()
if input_device is not None:
with ALSAErrorSuppress():
self.stream = self.p.open(
format=pyaudio.paInt16,
channels=self.channels,
rate=self.sample_rate,
input=True,
input_device_index=input_device,
frames_per_buffer=self.chunk
)
logger.info(f"🎤 Audio input ready (device {input_device})")
else:
logger.warning("⚠️ No audio input found, using silence")
self.stream = None
except Exception as e:
logger.error(f"❌ Audio input failed: {e}")
self.stream = None
def _find_input_device(self):
if not self.p:
return None
try:
for i in range(self.p.get_device_count()):
try:
with ALSAErrorSuppress():
device_info = self.p.get_device_info_by_index(i)
if device_info.get('maxInputChannels', 0) > 0:
device_name = device_info.get('name', 'Unknown')
logger.info(f"Testing input device {i}: {device_name}")
try:
with ALSAErrorSuppress():
test_stream = self.p.open(
format=pyaudio.paInt16,
channels=self.channels,
rate=self.sample_rate,
input=True,
input_device_index=i,
frames_per_buffer=self.chunk
)
test_stream.close()
logger.info(f"✅ Device {i} works: {device_name}")
return i
except:
continue
except:
continue
except Exception as e:
logger.error(f"Error finding input device: {e}")
return None
async def recv(self):
if not self.running:
raise ConnectionError("Audio track stopped")
try:
if self.stream and self.running:
with ALSAErrorSuppress():
data = self.stream.read(self.chunk, exception_on_overflow=False)
else:
data = np.zeros(self.chunk, dtype=np.int16).tobytes()
frame = AudioFrame(format="s16", layout="mono", samples=self.chunk)
frame.sample_rate = self.sample_rate
frame.planes[0].update(data)
frame.pts = self.pts
frame.time_base = self.time_base
self.pts += self.chunk
return frame
except Exception as e:
logger.warning(f"Audio error: {e}")
silence = np.zeros(self.chunk, dtype=np.int16).tobytes()
frame = AudioFrame(format="s16", layout="mono", samples=self.chunk)
frame.sample_rate = self.sample_rate
frame.planes[0].update(silence)
frame.pts = self.pts
frame.time_base = self.time_base
self.pts += self.chunk
return frame
def stop(self):
self.running = False
if self.stream:
try:
with ALSAErrorSuppress():
self.stream.stop_stream()
self.stream.close()
logger.info("🎤 Audio input stopped")
except:
pass
async def handle_client(websocket):
client_id = f"client_{id(websocket)}"
logger.info(f"🔗 {client_id} connected")
pc = None
camera_track = None
audio_track = None
try:
async for message in websocket:
try:
data = json.loads(message)
msg_type = data.get('type', 'unknown')
if msg_type == 'offer':
logger.info("🎬 Processing offer...")
try:
pc = RTCPeerConnection()
camera_track = CameraStreamTrack()
audio_track = MicrophoneStreamTrack()
pc.addTrack(camera_track)
pc.addTrack(audio_track)
logger.info("📹🎤 Video and audio tracks added")
@pc.on("icecandidate")
async def on_ice(candidate):
if candidate:
try:
await websocket.send(json.dumps({
'type': 'ice-candidate',
'candidate': {
'candidate': candidate.candidate,
'sdpMid': candidate.sdpMid,
'sdpMLineIndex': candidate.sdpMLineIndex
}
}))
except Exception as e:
logger.error(f"❌ ICE send failed: {e}")
@pc.on("connectionstatechange")
async def on_state():
state = pc.connectionState
logger.info(f"🔄 State: {state}")
if state == "connected":
logger.info("🎉 CONNECTED! Streaming video + audio!")
elif state == "failed":
logger.error("❌ Connection failed")
logger.info("📥 Setting remote description...")
offer = RTCSessionDescription(sdp=data['sdp'], type=data['type'])
await pc.setRemoteDescription(offer)
logger.info("✅ Remote description set")
logger.info("📤 Creating answer...")
answer = await pc.createAnswer()
await pc.setLocalDescription(answer)
logger.info("✅ Answer created")
response = {
'type': 'answer',
'sdp': pc.localDescription.sdp
}
await websocket.send(json.dumps(response))
logger.info("✅ Answer sent successfully")
except Exception as e:
logger.error(f"❌ Offer processing failed: {e}")
continue
elif msg_type == 'ice-candidate' and pc:
try:
cand_data = data.get('candidate')
if cand_data and 'candidate' in cand_data:
candidate = RTCIceCandidate(
candidate=cand_data['candidate'],
sdpMid=cand_data.get('sdpMid'),
sdpMLineIndex=cand_data.get('sdpMLineIndex')
)
await pc.addIceCandidate(candidate)
logger.debug("🧊 ICE candidate added")
except Exception as e:
logger.error(f"❌ ICE candidate error: {e}")
except json.JSONDecodeError:
logger.error("❌ Invalid JSON received")
except Exception as e:
logger.error(f"❌ Message processing error: {e}")
except websockets.exceptions.ConnectionClosed:
logger.info(f"👋 {client_id} disconnected")
except Exception as e:
logger.error(f"❌ Client handler error: {e}")
finally:
logger.info(f"🧹 Cleaning up {client_id}")
if audio_track:
audio_track.stop()
if camera_track:
camera_track.stop()
if pc:
try:
await pc.close()
except:
pass
async def main():
print("🚀 Raspberry Pi Video + Audio Streamer")
print("=" * 45)
# Camera test
cap = cv2.VideoCapture(CAMERA_DEVICE)
if cap.isOpened():
ret, frame = cap.read()
if ret:
print(f"✅ Camera OK: {frame.shape}")
else:
print("⚠️ Camera detected but can't read")
cap.release()
else:
print("⚠️ No camera - will use test pattern")
# Audio test
print("🔍 Testing audio devices...")
try:
with ALSAErrorSuppress():
p = pyaudio.PyAudio()
input_devices = []
for i in range(p.get_device_count()):
try:
with ALSAErrorSuppress():
info = p.get_device_info_by_index(i)
if info.get('maxInputChannels', 0) > 0:
input_devices.append(f" Input {i}: {info.get('name', 'Unknown')}")
except:
continue
print(f"📥 Input devices found ({len(input_devices)}):")
for device in input_devices:
print(device)
with ALSAErrorSuppress():
p.terminate()
except Exception as e:
print(f"❌ Audio device test failed: {e}")
print(f"🌐 Starting server on port {WEBSOCKET_PORT}")
server = await websockets.serve(handle_client, "0.0.0.0", WEBSOCKET_PORT)
print("✅ Server ready!")
print("📱 Connect from your web browser now")
print("=" * 45)
try:
await server.wait_closed()
except KeyboardInterrupt:
print("\n🛑 Shutting down...")
server.close()
await server.wait_closed()
print("👋 Goodbye!")
if __name__ == "__main__":
asyncio.run(main())