//go:build !js // +build !js package main import ( "context" "flag" "fmt" "net/http" "os" "time" "connectrpc.com/connect" pb "github.com/chathaway-codes/home-sensors/v2/gen" servicepb "github.com/chathaway-codes/home-sensors/v2/gen/genconnect" "github.com/chathaway-codes/home-sensors/v2/internal/video" "github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3/pkg/media" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "google.golang.org/protobuf/proto" ) func withAuth[T any](token string, v *T) *connect.Request[T] { req := connect.NewRequest[T](v) req.Header().Add("Authorization", "Bearer "+token) return req } func main() { log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr}) flag.Parse() ctx := context.Background() vid, err := video.Default.Get() if err != nil { log.Fatal().Err(err).Msg("failed to get default video") } client := servicepb.NewSignalerServiceClient(http.DefaultClient, "http://192.168.0.65:8080/") authToken, err := client.CreateAuthToken(ctx, connect.NewRequest(&pb.CreateAuthTokenRequest{ Home: "home1234", Type: &pb.CreateAuthTokenRequest_Camera_{ Camera: &pb.CreateAuthTokenRequest_Camera{ Id: "movie", }, }, })) if err != nil { log.Fatal().Err(err).Msg("failed to get auth token") } token := authToken.Msg.GetToken() go vid.Run() defer vid.Done() // Create a new RTCPeerConnection log.Info().Msg("waiting for connections") for { // Wait for a session request session, err := client.PopSession(ctx, withAuth(token, &pb.PopSessionRequest{})) if err != nil { log.Fatal().Err(err).Msg("error creating session") } go handleSession(ctx, client, token, session, vid) } } func handleSession(ctx context.Context, client servicepb.SignalerServiceClient, token string, session *connect.Response[pb.Session], vid *video.Video) { var err error log.Debug().Msg("new session") peerConnection, err := webrtc.NewPeerConnection(webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ { URLs: []string{"stun:stun.l.google.com:19302"}, }, }, }) // We use the cancel func to signal that the stream is ready iceConnectedCtx, iceConnectedCtxCancel := context.WithCancel(context.Background()) defer func() { if err := peerConnection.Close(); err != nil { log.Debug().Err(err).Msg("cannot close peerConnection") } }() // connect to the video stream; the cleanup is done in the goroutine which // consumes the framess ch, trackCodec, cleanUp := vid.Join() // Create a video track videoTrack, videoTrackErr := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: trackCodec}, "video", "pion") if videoTrackErr != nil { log.Info().Err(err).Msg("Failed to create video track") } rtpSender, err := peerConnection.AddTrack(videoTrack) if err != nil { log.Info().Err(err).Msg("Failed to add track to connection") } // Read incoming RTCP packets // Before these packets are returned they are processed by interceptors. For things // like NACK this needs to be called. go func() { rtcpBuf := make([]byte, 1500) for { if _, _, err := rtpSender.Read(rtcpBuf); err != nil { return } } }() go func() { defer cleanUp() readyToSend := false for frame := range ch { select { case <-iceConnectedCtx.Done(): readyToSend = true default: // do nothing } if !readyToSend { continue } if err := videoTrack.WriteSample(media.Sample{Data: frame, Duration: time.Second}); err != nil { panic(err) } } }() // Set the handler for ICE connection state // This will notify you when the peer has connected/disconnected peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { log.Debug().Msgf("Connection State has changed %s \n", connectionState.String()) if connectionState == webrtc.ICEConnectionStateConnected { iceConnectedCtxCancel() } }) // Set the handler for Peer connection state // This will notify you when the peer has connected/disconnected peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) { log.Debug().Msgf("Peer Connection State has changed: %s\n", s.String()) if s == webrtc.PeerConnectionStateFailed { // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart. // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout. // Note that the PeerConnection may come back from PeerConnectionStateDisconnected. fmt.Println("Peer Connection has gone to failed exiting") return } }) peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) { if i == nil { if _, err := client.CreateIceMessage(ctx, withAuth(token, &pb.CreateIceMessageRequest{ SessionIdentifier: session.Msg.GetId(), IceMessage: &pb.IceMessage{ Type: &pb.IceMessage_NoMoreCandidates{}, }, })); err != nil { log.Warn().Err(err).Msg("error sending done w/ candidates") } return } c := i.ToJSON() var usernameFragment *string if c.UsernameFragment != nil { usernameFragment = proto.String(*c.UsernameFragment) } client.CreateIceMessage(ctx, withAuth(token, &pb.CreateIceMessageRequest{ SessionIdentifier: session.Msg.GetId(), IceMessage: &pb.IceMessage{ Type: &pb.IceMessage_Candidate{ Candidate: &pb.IceCandidate{ Candidate: c.Candidate, SdpMid: c.SDPMid, SdpLineIndex: proto.Int32(int32(*c.SDPMLineIndex)), UsernameFragment: usernameFragment, }, }, }, })) }) log.Info().Msg("Spawning helper") // helper which sends answers, waits for // Add ICE candidates from remote for { msg, err := client.PopIceMessage(ctx, withAuth(token, &pb.PopIceMessageRequest{ SessionIdentifier: session.Msg.GetId(), })) if err != nil { log.Info().Err(err).Msg("failed to pop ice message") continue } switch msg.Msg.Type.(type) { case *pb.IceMessage_Candidate: candidate := msg.Msg.GetCandidate() var sdpMLine *uint16 if candidate.SdpLineIndex != nil { t := uint16(candidate.GetSdpLineIndex()) sdpMLine = &t } if err := peerConnection.AddICECandidate(webrtc.ICECandidateInit{ Candidate: candidate.GetCandidate(), SDPMid: candidate.SdpMid, SDPMLineIndex: sdpMLine, }); err != nil { log.Warn().Err(err).Msg("failed to add ice candidate") } // Send back an answer answer, err := peerConnection.CreateAnswer(nil) if err != nil { log.Debug().Msg("Candidate failed") continue } if err := peerConnection.SetLocalDescription(answer); err != nil { log.Info().Err(err).Msg("Failed to set local description") } _, err = client.CreateIceMessage(ctx, withAuth(token, &pb.CreateIceMessageRequest{ SessionIdentifier: session.Msg.GetId(), IceMessage: &pb.IceMessage{ Type: &pb.IceMessage_Session{ Session: &pb.IceSessionDescription{ SdpType: int64(answer.Type), Sdp: answer.SDP, }, }, }, })) if err != nil { log.Info().Err(err).Msg("Failed to send answer") } case *pb.IceMessage_Session: iceSession := msg.Msg.GetSession() switch iceSession.SdpType { case int64(webrtc.SDPTypeOffer): offer := webrtc.SessionDescription{ Type: webrtc.SDPType(iceSession.SdpType), SDP: iceSession.Sdp, } if err := peerConnection.SetRemoteDescription(offer); err != nil { log.Warn().Err(err).Msg("failed to set remote description") } default: log.Info().Msgf("unexpected sdp type: %v", webrtc.SDPType(iceSession.SdpType).String()) } log.Info().Msg("Accepted promise!") case *pb.IceMessage_NoMoreCandidates: // do nothing } } }