//go:build !js // +build !js package main import ( "context" "errors" "flag" "fmt" "io" "log" "net/http" "os" "sync" "time" "connectrpc.com/connect" pb "github.com/chathaway-codes/home-sensors/v2/gen" servicepb "github.com/chathaway-codes/home-sensors/v2/gen/genconnect" "github.com/google/uuid" "github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3/pkg/media" "github.com/pion/webrtc/v3/pkg/media/ivfreader" "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/proto" ) var ( videoFileName = flag.String("in", "/home/charles/Downloads/simpsons_movie_1080p_hddvd_trailer/output.ivf", "Where to load data from; if set to -, stdin will be used") ) func withAuth[T any](token string, v *T) *connect.Request[T] { req := connect.NewRequest[T](v) req.Header().Add("Authorization", "Bearer "+token) return req } func main() { //nolint flag.Parse() ctx := context.Background() /*httpClient := &http.Client{ Transport: &http2.Transport{ AllowHTTP: true, DialTLS: func(network, addr string, _ *tls.Config) (net.Conn, error) { // If you're also using this client for non-h2c traffic, you may want // to delegate to tls.Dial if the network isn't TCP or the addr isn't // in an allowlist. log.Printf("Connecting to %s : %s", network, addr) return net.Dial(network, addr) }, // Don't forget timeouts! }, }*/ vid, err := newVideo(ctx) if err != nil { log.Fatalf("Failed to start video: %v", err) } client := servicepb.NewSignalerServiceClient(http.DefaultClient, "http://192.168.0.65:8080/") authToken, err := client.CreateAuthToken(ctx, connect.NewRequest(&pb.CreateAuthTokenRequest{ Home: "home1234", Type: &pb.CreateAuthTokenRequest_Camera_{ Camera: &pb.CreateAuthTokenRequest_Camera{ Id: "movie", }, }, })) if err != nil { log.Fatalf("Failed to get auth token: %v", err) } token := authToken.Msg.GetToken() log.Printf("Got token %s", prototext.Format(authToken.Msg)) // Create a new RTCPeerConnection log.Printf("Waiting for connections") for { // Wait for a session request session, err := client.PopSession(ctx, withAuth(token, &pb.PopSessionRequest{})) if err != nil { log.Fatalf("error creating session: %v", err) } go handleSession(ctx, client, token, session, vid) } } type video struct { mu sync.Mutex listeners map[string]chan<- []byte codec string } func newVideo(ctx context.Context) (*video, error) { var err error // Assert that we have an audio or video file videoFileName := *videoFileName var videoIn io.Reader if videoFileName == "-" { videoIn = os.Stdin } else { videoIn, err = os.Open(videoFileName) if err != nil { return nil, fmt.Errorf("failed to open %q: %v", videoFileName, err) } } ivf, header, err := ivfreader.NewWith(videoIn) if err != nil { return nil, fmt.Errorf("failed to read video: %v", err) } // Determine video codec var trackCodec string switch header.FourCC { case "AV01": trackCodec = webrtc.MimeTypeAV1 case "VP90": trackCodec = webrtc.MimeTypeVP9 case "VP80": trackCodec = webrtc.MimeTypeVP8 default: return nil, fmt.Errorf("unable to handle FourCC %s", header.FourCC) } vid := &video{ listeners: make(map[string]chan<- []byte), codec: trackCodec, } go func() { // Send our video file frame at a time. Pace our sending so we send it at the same speed it should be played back as. // This isn't required since the video is timestamped, but we will such much higher loss if we send all at once. // // It is important to use a time.Ticker instead of time.Sleep because // * avoids accumulating skew, just calling time.Sleep didn't compensate for the time spent parsing the data // * works around latency issues with Sleep (see https://github.com/golang/go/issues/44343) ticker := time.NewTicker(time.Millisecond * time.Duration((float32(header.TimebaseNumerator)/float32(header.TimebaseDenominator))*1000)) for ; true; <-ticker.C { frame, _, ivfErr := ivf.ParseNextFrame() if errors.Is(ivfErr, io.EOF) { fmt.Printf("All video frames parsed and sent") } if ivfErr != nil { panic(ivfErr) } vid.mu.Lock() for _, lis := range vid.listeners { lis <- frame } vid.mu.Unlock() } }() return vid, nil } func (v *video) Join() (<-chan []byte, string, func()) { v.mu.Lock() defer v.mu.Unlock() myID := uuid.New().String() ch := make(chan []byte) v.listeners[myID] = ch return ch, v.codec, func() { v.mu.Lock() defer v.mu.Unlock() delete(v.listeners, myID) } } func handleSession(ctx context.Context, client servicepb.SignalerServiceClient, token string, session *connect.Response[pb.Session], vid *video) { var err error log.Printf("New session") peerConnection, err := webrtc.NewPeerConnection(webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ { URLs: []string{"stun:stun.l.google.com:19302"}, }, }, }) // We use the cancel func to signal that the stream is ready iceConnectedCtx, iceConnectedCtxCancel := context.WithCancel(context.Background()) defer func() { if err := peerConnection.Close(); err != nil { fmt.Printf("cannot close peerConnection: %v\n", err) } }() // connect to the video stream; the cleanup is done in the goroutine which // consumes the framess ch, trackCodec, cleanUp := vid.Join() // Create a video track videoTrack, videoTrackErr := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: trackCodec}, "video", "pion") if videoTrackErr != nil { log.Printf("Failed to create video track: %v", err) } rtpSender, err := peerConnection.AddTrack(videoTrack) if err != nil { log.Printf("Failed to add track to connection: %v", err) } // Read incoming RTCP packets // Before these packets are returned they are processed by interceptors. For things // like NACK this needs to be called. go func() { rtcpBuf := make([]byte, 1500) for { if _, _, err := rtpSender.Read(rtcpBuf); err != nil { return } } }() go func() { defer cleanUp() readyToSend := false for frame := range ch { select { case <-iceConnectedCtx.Done(): readyToSend = true default: // do nothing } if !readyToSend { continue } if err := videoTrack.WriteSample(media.Sample{Data: frame, Duration: time.Second}); err != nil { panic(err) } } }() // Set the handler for ICE connection state // This will notify you when the peer has connected/disconnected peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { fmt.Printf("Connection State has changed %s \n", connectionState.String()) if connectionState == webrtc.ICEConnectionStateConnected { iceConnectedCtxCancel() } }) // Set the handler for Peer connection state // This will notify you when the peer has connected/disconnected peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) { fmt.Printf("Peer Connection State has changed: %s\n", s.String()) if s == webrtc.PeerConnectionStateFailed { // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart. // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout. // Note that the PeerConnection may come back from PeerConnectionStateDisconnected. fmt.Println("Peer Connection has gone to failed exiting") return } }) peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) { if i == nil { if _, err := client.CreateIceMessage(ctx, withAuth(token, &pb.CreateIceMessageRequest{ SessionIdentifier: session.Msg.GetId(), IceMessage: &pb.IceMessage{ Type: &pb.IceMessage_NoMoreCandidates{}, }, })); err != nil { log.Fatalf("Error sending done w/ candidates: %v", err) } return } c := i.ToJSON() var usernameFragment *string if c.UsernameFragment != nil { usernameFragment = proto.String(*c.UsernameFragment) } client.CreateIceMessage(ctx, withAuth(token, &pb.CreateIceMessageRequest{ SessionIdentifier: session.Msg.GetId(), IceMessage: &pb.IceMessage{ Type: &pb.IceMessage_Candidate{ Candidate: &pb.IceCandidate{ Candidate: c.Candidate, SdpMid: c.SDPMid, SdpLineIndex: proto.Int32(int32(*c.SDPMLineIndex)), UsernameFragment: usernameFragment, }, }, }, })) }) log.Printf("Spawning helper") // helper which sends answers, waits for // Add ICE candidates from remote for { msg, err := client.PopIceMessage(ctx, withAuth(token, &pb.PopIceMessageRequest{ SessionIdentifier: session.Msg.GetId(), })) if err != nil { log.Printf("failed to pop ice message: %v", err) } //log.Printf("Got ice message: %v", prototext.Format(msg.Msg)) switch msg.Msg.Type.(type) { case *pb.IceMessage_Candidate: candidate := msg.Msg.GetCandidate() var sdpMLine *uint16 if candidate.SdpLineIndex != nil { t := uint16(candidate.GetSdpLineIndex()) sdpMLine = &t } if err := peerConnection.AddICECandidate(webrtc.ICECandidateInit{ Candidate: candidate.GetCandidate(), SDPMid: candidate.SdpMid, SDPMLineIndex: sdpMLine, }); err != nil { log.Fatalf("Failed to add ice candidate: %v", err) } // Send back an answer answer, err := peerConnection.CreateAnswer(nil) if err != nil { log.Printf("Candidate failed") continue } if err := peerConnection.SetLocalDescription(answer); err != nil { log.Printf("Failed to set local description: %v", err) } _, err = client.CreateIceMessage(ctx, withAuth(token, &pb.CreateIceMessageRequest{ SessionIdentifier: session.Msg.GetId(), IceMessage: &pb.IceMessage{ Type: &pb.IceMessage_Session{ Session: &pb.IceSessionDescription{ SdpType: int64(answer.Type), Sdp: answer.SDP, }, }, }, })) if err != nil { log.Printf("Failed to send answer: %v", err) } case *pb.IceMessage_Session: iceSession := msg.Msg.GetSession() switch iceSession.SdpType { case int64(webrtc.SDPTypeOffer): offer := webrtc.SessionDescription{ Type: webrtc.SDPType(iceSession.SdpType), SDP: iceSession.Sdp, } if err := peerConnection.SetRemoteDescription(offer); err != nil { log.Fatalf("Failed to set remote description: %v", err) } default: log.Printf("unexpected sdp type: %v", webrtc.SDPType(iceSession.SdpType).String()) } log.Printf("Accepted promise!") case *pb.IceMessage_NoMoreCandidates: // do nothing } } }