//go:build !js // +build !js package main import ( "context" "flag" "fmt" "net/http" "sync" "time" "connectrpc.com/connect" pb "github.com/chathaway-codes/home-sensors/v2/gen" servicepb "github.com/chathaway-codes/home-sensors/v2/gen/genconnect" "github.com/chathaway-codes/home-sensors/v2/internal/h264video" "github.com/chathaway-codes/home-sensors/v2/internal/sensors" "github.com/chathaway-codes/home-sensors/v2/internal/watcher/config" "github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3/pkg/media" "github.com/rs/zerolog/log" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" ) var ( signalerServer = flag.String("signaler_address", "home.chathaway.codes", "address of the signaler") ) func withAuth[T any](token string, v *T) *connect.Request[T] { req := connect.NewRequest[T](v) req.Header().Add("Authorization", "Bearer "+token) return req } func main() { flag.Parse() ctx := context.Background() cfg, err := config.Default.Get() if err != nil { log.Fatal().Err(err).Msg("failed to get config") } client := servicepb.NewSignalerServiceClient( http.DefaultClient, fmt.Sprintf("https://%s/", *signalerServer), connect.WithGRPC(), ) vid, err := h264video.Default.Get() if err != nil { log.Fatal().Err(err).Msg("failed to get default video") } sensors, err := sensors.Default.Get() if err != nil { log.Fatal().Err(err).Msg("failed to get default sensor") } go vid.Run() defer vid.Done() go sensors.Run() defer sensors.Done() sensorCh, sensorDone := sensors.Join() defer sensorDone() token, err := getAuthToken(ctx, client, cfg) if err != nil { log.Fatal().Err(err).Msg("failed to get auth token") } go handleSensor(ctx, client, token, sensorCh) // Create a new RTCPeerConnection log.Info().Msg("waiting for connections") for { // Wait for a session request session, err := client.PopSession(ctx, withAuth(token, &pb.PopSessionRequest{})) if err != nil { code, hasCode := status.FromError(err) if hasCode && code.Code() == codes.NotFound { // try getting a new token token, err = getAuthToken(ctx, client, cfg) if err != nil { log.Fatal().Err(err).Msg("failed to recreate auth token") } continue } log.Error().Err(err).Str("code", fmt.Sprintf("code %d", code.Code())).Bool("has code", hasCode).Msg("error creating session") continue } if session.Msg.GetId() == nil { continue } go handleSession(ctx, client, token, session, vid) } } func getAuthToken(ctx context.Context, client servicepb.SignalerServiceClient, cfg *config.Config) (string, error) { authToken, err := client.CreateAuthToken(ctx, connect.NewRequest(&pb.CreateAuthTokenRequest{ Home: cfg.HomeName, Type: &pb.CreateAuthTokenRequest_Camera_{ Camera: &pb.CreateAuthTokenRequest_Camera{ Id: cfg.CameraName, }, }, })) if err != nil { return "", fmt.Errorf("CreateAuthToken failed: %w", err) } token := authToken.Msg.GetToken() return token, nil } func handleSensor(ctx context.Context, client servicepb.SignalerServiceClient, token string, ch <-chan *pb.Sample) { for { var sample *pb.Sample select { case sample = <-ch: // proceed case <-ctx.Done(): return } if _, err := client.CreateSample(ctx, withAuth(token, &pb.CreateSampleRequest{ Sample: sample, })); err != nil { log.Error().Err(err).Msg("failed to create sample") } } } func getIceConnection(ctx context.Context, peerConnection *webrtc.PeerConnection, client servicepb.SignalerServiceClient, token string, session *connect.Response[pb.Session]) (context.Context, error) { // We use the cancel func to signal that the stream is ready iceConnectedCtx, iceConnectedCtxCancel := context.WithCancel(context.Background()) iceDisconnectedCtx, iceDeconnectedCtxCancel := context.WithCancel(ctx) readyToSend := sync.WaitGroup{} readyToSend.Add(1) // Set the handler for ICE connection state // This will notify you when the peer has connected/disconnected peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { log.Debug().Msgf("Connection State has changed %s \n", connectionState.String()) if connectionState == webrtc.ICEConnectionStateConnected { iceConnectedCtxCancel() } }) // Set the handler for Peer connection state // This will notify you when the peer has connected/disconnected peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) { log.Debug().Msgf("Peer Connection State has changed: %s\n", s.String()) if s == webrtc.PeerConnectionStateFailed { // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart. // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout. // Note that the PeerConnection may come back from PeerConnectionStateDisconnected. iceDeconnectedCtxCancel() } }) peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) { readyToSend.Wait() if i == nil { if _, err := client.CreateIceMessage(ctx, withAuth(token, &pb.CreateIceMessageRequest{ SessionIdentifier: session.Msg.GetId(), IceMessage: &pb.IceMessage{ Type: &pb.IceMessage_NoMoreCandidates{}, }, })); err != nil { log.Warn().Err(err).Msg("error sending done w/ candidates") } return } c := i.ToJSON() var usernameFragment *string if c.UsernameFragment != nil { usernameFragment = proto.String(*c.UsernameFragment) } log.Info().Msgf("got candidate %+v", c) client.CreateIceMessage(ctx, withAuth(token, &pb.CreateIceMessageRequest{ SessionIdentifier: session.Msg.GetId(), IceMessage: &pb.IceMessage{ Type: &pb.IceMessage_Candidate{ Candidate: &pb.IceCandidate{ Candidate: c.Candidate, SdpMid: c.SDPMid, SdpLineIndex: proto.Int32(int32(*c.SDPMLineIndex)), UsernameFragment: usernameFragment, }, }, }, })) }) // Get the offer from the other msg, err := client.PopIceMessage(iceDisconnectedCtx, withAuth(token, &pb.PopIceMessageRequest{ SessionIdentifier: session.Msg.GetId(), })) if err != nil { log.Info().Err(err).Msg("failed to pop ice message") } iceSession := msg.Msg.GetSession() switch iceSession.SdpType { case int64(webrtc.SDPTypeOffer): offer := webrtc.SessionDescription{ Type: webrtc.SDPType(iceSession.SdpType), SDP: iceSession.Sdp, } if err := peerConnection.SetRemoteDescription(offer); err != nil { log.Warn().Err(err).Msg("failed to set remote description") } default: log.Info().Msgf("unexpected sdp type: %v", webrtc.SDPType(iceSession.SdpType).String()) } log.Info().Msg("Accepted promise!") // Send back an answer answer, err := peerConnection.CreateAnswer(nil) if err != nil { log.Debug().Err(err).Msg("Candidate failed") } if err := peerConnection.SetLocalDescription(answer); err != nil { log.Info().Err(err).Msg("Failed to set local description") } _, err = client.CreateIceMessage(iceDisconnectedCtx, withAuth(token, &pb.CreateIceMessageRequest{ SessionIdentifier: session.Msg.GetId(), IceMessage: &pb.IceMessage{ Type: &pb.IceMessage_Session{ Session: &pb.IceSessionDescription{ SdpType: int64(answer.Type), Sdp: answer.SDP, }, }, }, })) if err != nil { log.Info().Err(err).Msg("Failed to send answer") } readyToSend.Done() // TODO: do we add the video right here? // Go into a loop processing ice candidates // Add ICE candidates from remote go func() { for { select { case <-iceDisconnectedCtx.Done(): return default: // check for another message } msg, err := client.PopIceMessage(iceDisconnectedCtx, withAuth(token, &pb.PopIceMessageRequest{ SessionIdentifier: session.Msg.GetId(), })) if err != nil { log.Info().Err(err).Msg("failed to pop ice message") continue } switch msg.Msg.Type.(type) { case *pb.IceMessage_Candidate: candidate := msg.Msg.GetCandidate() var sdpMLine *uint16 if candidate.SdpLineIndex != nil { t := uint16(candidate.GetSdpLineIndex()) sdpMLine = &t } if err := peerConnection.AddICECandidate(webrtc.ICECandidateInit{ Candidate: candidate.GetCandidate(), SDPMid: candidate.SdpMid, SDPMLineIndex: sdpMLine, }); err != nil { log.Warn().Err(err).Msg("failed to add ice candidate") } case *pb.IceMessage_Session: case *pb.IceMessage_NoMoreCandidates: // do nothing } } }() select { case <-iceConnectedCtx.Done(): // Success; return the connection return iceDisconnectedCtx, nil case <-iceDisconnectedCtx.Done(): // No connection; return error return ctx, fmt.Errorf("failed to create connection: ICE disconnected") } } func handleSession(ctx context.Context, client servicepb.SignalerServiceClient, token string, session *connect.Response[pb.Session], vid *h264video.Video) { log.Debug().Msg("new session") // connect to the video stream; the cleanup is done in the goroutine which // consumes the framess ch, trackCodec, cleanUp := vid.Join() defer cleanUp() // Create a video track videoTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: trackCodec}, "video", "pion") if err != nil { log.Info().Err(err).Msg("Failed to create video track") return } peerConnection, err := webrtc.NewPeerConnection(webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ { URLs: []string{"stun:stun.l.google.com:19302"}, }, }, }) if err != nil { log.Err(err).Msg("failed to make connection") } rtpSender, err := peerConnection.AddTrack(videoTrack) if err != nil { log.Info().Err(err).Msg("Failed to add track to connection") return } disconnectedCtx, err := getIceConnection(ctx, peerConnection, client, token, session) if err != nil { log.Err(err).Msg("failed creating ice connection") } log.Info().Msgf("State is %+v", rtpSender.Transport().State()) // Read incoming RTCP packets // Before these packets are returned they are processed by interceptors. For things // like NACK this needs to be called. go func() { rtcpBuf := make([]byte, 1500) for { select { case <-disconnectedCtx.Done(): return default: // do nothing } if _, _, err := rtpSender.Read(rtcpBuf); err != nil { return } } }() // Start a routine to send frames from a buffer var frame []byte var frameLock sync.Mutex go func() { ticker := time.NewTicker(time.Millisecond * 30) for { select { case <-disconnectedCtx.Done(): return case <-ticker.C: // do nothing } frameLock.Lock() myFrame := frame frame = nil frameLock.Unlock() if myFrame == nil { continue } if err := videoTrack.WriteSample(media.Sample{Data: myFrame, Duration: time.Millisecond * 33}); err != nil { log.Err(err).Msg("failed to write sample") return } } }() for myFrame := range ch { select { case <-disconnectedCtx.Done(): return default: // do nothing } frameLock.Lock() frame = myFrame frameLock.Unlock() } // TODO: Video ended; close the connection }