From a07a993bab843bcfb3f51eb68d26b4ce90e06d92 Mon Sep 17 00:00:00 2001 From: Charles Hathaway Date: Sun, 1 Oct 2023 20:38:38 -0700 Subject: [PATCH] add: run video in pipe --- cmd/watcher/{main.go => watcher.go} | 168 +++++---------------------- go.mod | 10 +- go.sum | 16 ++- internal/pipespy/pipespy.go | 159 ++++++++++++++++++++++++++ internal/pipespy/pipespy_test.go | 115 +++++++++++++++++++ internal/video/video.go | 170 ++++++++++++++++++++++++++++ internal/watcher/config/config.go | 57 ++++++++++ sample.yaml | 16 +++ 8 files changed, 570 insertions(+), 141 deletions(-) rename cmd/watcher/{main.go => watcher.go} (59%) create mode 100644 internal/pipespy/pipespy.go create mode 100644 internal/pipespy/pipespy_test.go create mode 100644 internal/video/video.go create mode 100644 internal/watcher/config/config.go create mode 100644 sample.yaml diff --git a/cmd/watcher/main.go b/cmd/watcher/watcher.go similarity index 59% rename from cmd/watcher/main.go rename to cmd/watcher/watcher.go index 4cace55..77ed909 100644 --- a/cmd/watcher/main.go +++ b/cmd/watcher/watcher.go @@ -5,57 +5,37 @@ package main import ( "context" - "errors" "flag" "fmt" - "io" - "log" "net/http" "os" - "sync" "time" "connectrpc.com/connect" pb "github.com/chathaway-codes/home-sensors/v2/gen" servicepb "github.com/chathaway-codes/home-sensors/v2/gen/genconnect" - "github.com/google/uuid" + "github.com/chathaway-codes/home-sensors/v2/internal/video" "github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3/pkg/media" - "github.com/pion/webrtc/v3/pkg/media/ivfreader" - "google.golang.org/protobuf/encoding/prototext" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" "google.golang.org/protobuf/proto" ) -var ( - videoFileName = flag.String("in", "/home/charles/Downloads/simpsons_movie_1080p_hddvd_trailer/output.ivf", "Where to load data from; if set to -, stdin will be used") -) - func withAuth[T any](token string, v *T) *connect.Request[T] { req := connect.NewRequest[T](v) req.Header().Add("Authorization", "Bearer "+token) return req } -func main() { //nolint +func main() { + log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr}) flag.Parse() ctx := context.Background() - /*httpClient := &http.Client{ - Transport: &http2.Transport{ - AllowHTTP: true, - DialTLS: func(network, addr string, _ *tls.Config) (net.Conn, error) { - // If you're also using this client for non-h2c traffic, you may want - // to delegate to tls.Dial if the network isn't TCP or the addr isn't - // in an allowlist. - log.Printf("Connecting to %s : %s", network, addr) - return net.Dial(network, addr) - }, - // Don't forget timeouts! - }, - }*/ - vid, err := newVideo(ctx) + vid, err := video.Default.Get() if err != nil { - log.Fatalf("Failed to start video: %v", err) + log.Fatal().Err(err).Msg("failed to get default video") } client := servicepb.NewSignalerServiceClient(http.DefaultClient, "http://192.168.0.65:8080/") authToken, err := client.CreateAuthToken(ctx, connect.NewRequest(&pb.CreateAuthTokenRequest{ @@ -67,115 +47,29 @@ func main() { //nolint }, })) if err != nil { - log.Fatalf("Failed to get auth token: %v", err) + log.Fatal().Err(err).Msg("failed to get auth token") } token := authToken.Msg.GetToken() - log.Printf("Got token %s", prototext.Format(authToken.Msg)) + + go vid.Run() + defer vid.Done() // Create a new RTCPeerConnection - log.Printf("Waiting for connections") + log.Info().Msg("waiting for connections") for { // Wait for a session request session, err := client.PopSession(ctx, withAuth(token, &pb.PopSessionRequest{})) if err != nil { - log.Fatalf("error creating session: %v", err) + log.Fatal().Err(err).Msg("error creating session") } go handleSession(ctx, client, token, session, vid) } } -type video struct { - mu sync.Mutex - listeners map[string]chan<- []byte - codec string -} - -func newVideo(ctx context.Context) (*video, error) { +func handleSession(ctx context.Context, client servicepb.SignalerServiceClient, token string, session *connect.Response[pb.Session], vid *video.Video) { var err error - // Assert that we have an audio or video file - videoFileName := *videoFileName - var videoIn io.Reader - if videoFileName == "-" { - videoIn = os.Stdin - } else { - videoIn, err = os.Open(videoFileName) - if err != nil { - return nil, fmt.Errorf("failed to open %q: %v", videoFileName, err) - } - - } - - ivf, header, err := ivfreader.NewWith(videoIn) - if err != nil { - return nil, fmt.Errorf("failed to read video: %v", err) - } - - // Determine video codec - var trackCodec string - switch header.FourCC { - case "AV01": - trackCodec = webrtc.MimeTypeAV1 - case "VP90": - trackCodec = webrtc.MimeTypeVP9 - case "VP80": - trackCodec = webrtc.MimeTypeVP8 - default: - return nil, fmt.Errorf("unable to handle FourCC %s", header.FourCC) - } - - vid := &video{ - listeners: make(map[string]chan<- []byte), - codec: trackCodec, - } - go func() { - // Send our video file frame at a time. Pace our sending so we send it at the same speed it should be played back as. - // This isn't required since the video is timestamped, but we will such much higher loss if we send all at once. - // - // It is important to use a time.Ticker instead of time.Sleep because - // * avoids accumulating skew, just calling time.Sleep didn't compensate for the time spent parsing the data - // * works around latency issues with Sleep (see https://github.com/golang/go/issues/44343) - ticker := time.NewTicker(time.Millisecond * time.Duration((float32(header.TimebaseNumerator)/float32(header.TimebaseDenominator))*1000)) - for ; true; <-ticker.C { - frame, _, ivfErr := ivf.ParseNextFrame() - if errors.Is(ivfErr, io.EOF) { - fmt.Printf("All video frames parsed and sent") - } - - if ivfErr != nil { - panic(ivfErr) - } - - vid.mu.Lock() - for _, lis := range vid.listeners { - lis <- frame - } - vid.mu.Unlock() - } - }() - - return vid, nil -} - -func (v *video) Join() (<-chan []byte, string, func()) { - v.mu.Lock() - defer v.mu.Unlock() - - myID := uuid.New().String() - ch := make(chan []byte) - v.listeners[myID] = ch - - return ch, v.codec, func() { - v.mu.Lock() - defer v.mu.Unlock() - - delete(v.listeners, myID) - } -} - -func handleSession(ctx context.Context, client servicepb.SignalerServiceClient, token string, session *connect.Response[pb.Session], vid *video) { - var err error - log.Printf("New session") + log.Debug().Msg("new session") peerConnection, err := webrtc.NewPeerConnection(webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ @@ -189,7 +83,7 @@ func handleSession(ctx context.Context, client servicepb.SignalerServiceClient, iceConnectedCtx, iceConnectedCtxCancel := context.WithCancel(context.Background()) defer func() { if err := peerConnection.Close(); err != nil { - fmt.Printf("cannot close peerConnection: %v\n", err) + log.Debug().Err(err).Msg("cannot close peerConnection") } }() @@ -199,12 +93,12 @@ func handleSession(ctx context.Context, client servicepb.SignalerServiceClient, // Create a video track videoTrack, videoTrackErr := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: trackCodec}, "video", "pion") if videoTrackErr != nil { - log.Printf("Failed to create video track: %v", err) + log.Info().Err(err).Msg("Failed to create video track") } rtpSender, err := peerConnection.AddTrack(videoTrack) if err != nil { - log.Printf("Failed to add track to connection: %v", err) + log.Info().Err(err).Msg("Failed to add track to connection") } // Read incoming RTCP packets @@ -241,7 +135,7 @@ func handleSession(ctx context.Context, client servicepb.SignalerServiceClient, // Set the handler for ICE connection state // This will notify you when the peer has connected/disconnected peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { - fmt.Printf("Connection State has changed %s \n", connectionState.String()) + log.Debug().Msgf("Connection State has changed %s \n", connectionState.String()) if connectionState == webrtc.ICEConnectionStateConnected { iceConnectedCtxCancel() } @@ -250,7 +144,7 @@ func handleSession(ctx context.Context, client servicepb.SignalerServiceClient, // Set the handler for Peer connection state // This will notify you when the peer has connected/disconnected peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) { - fmt.Printf("Peer Connection State has changed: %s\n", s.String()) + log.Debug().Msgf("Peer Connection State has changed: %s\n", s.String()) if s == webrtc.PeerConnectionStateFailed { // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart. @@ -269,7 +163,7 @@ func handleSession(ctx context.Context, client servicepb.SignalerServiceClient, Type: &pb.IceMessage_NoMoreCandidates{}, }, })); err != nil { - log.Fatalf("Error sending done w/ candidates: %v", err) + log.Warn().Err(err).Msg("error sending done w/ candidates") } return } @@ -292,7 +186,7 @@ func handleSession(ctx context.Context, client servicepb.SignalerServiceClient, }, })) }) - log.Printf("Spawning helper") + log.Info().Msg("Spawning helper") // helper which sends answers, waits for @@ -302,9 +196,9 @@ func handleSession(ctx context.Context, client servicepb.SignalerServiceClient, SessionIdentifier: session.Msg.GetId(), })) if err != nil { - log.Printf("failed to pop ice message: %v", err) + log.Info().Err(err).Msg("failed to pop ice message") + continue } - //log.Printf("Got ice message: %v", prototext.Format(msg.Msg)) switch msg.Msg.Type.(type) { case *pb.IceMessage_Candidate: candidate := msg.Msg.GetCandidate() @@ -318,18 +212,18 @@ func handleSession(ctx context.Context, client servicepb.SignalerServiceClient, SDPMid: candidate.SdpMid, SDPMLineIndex: sdpMLine, }); err != nil { - log.Fatalf("Failed to add ice candidate: %v", err) + log.Warn().Err(err).Msg("failed to add ice candidate") } // Send back an answer answer, err := peerConnection.CreateAnswer(nil) if err != nil { - log.Printf("Candidate failed") + log.Debug().Msg("Candidate failed") continue } if err := peerConnection.SetLocalDescription(answer); err != nil { - log.Printf("Failed to set local description: %v", err) + log.Info().Err(err).Msg("Failed to set local description") } _, err = client.CreateIceMessage(ctx, withAuth(token, &pb.CreateIceMessageRequest{ @@ -344,7 +238,7 @@ func handleSession(ctx context.Context, client servicepb.SignalerServiceClient, }, })) if err != nil { - log.Printf("Failed to send answer: %v", err) + log.Info().Err(err).Msg("Failed to send answer") } case *pb.IceMessage_Session: iceSession := msg.Msg.GetSession() @@ -357,12 +251,12 @@ func handleSession(ctx context.Context, client servicepb.SignalerServiceClient, } if err := peerConnection.SetRemoteDescription(offer); err != nil { - log.Fatalf("Failed to set remote description: %v", err) + log.Warn().Err(err).Msg("failed to set remote description") } default: - log.Printf("unexpected sdp type: %v", webrtc.SDPType(iceSession.SdpType).String()) + log.Info().Msgf("unexpected sdp type: %v", webrtc.SDPType(iceSession.SdpType).String()) } - log.Printf("Accepted promise!") + log.Info().Msg("Accepted promise!") case *pb.IceMessage_NoMoreCandidates: // do nothing } diff --git a/go.mod b/go.mod index cabe722..27e0051 100644 --- a/go.mod +++ b/go.mod @@ -6,17 +6,22 @@ require ( connectrpc.com/connect v1.11.1 connectrpc.com/grpcreflect v1.2.0 github.com/gofrs/uuid/v5 v5.0.0 + github.com/google/go-cmp v0.5.9 + github.com/google/uuid v1.3.1 github.com/pion/webrtc/v3 v3.2.20 github.com/rs/cors v1.10.0 + github.com/rs/zerolog v1.31.0 golang.org/x/net v0.14.0 google.golang.org/grpc v1.58.1 google.golang.org/protobuf v1.31.0 + gopkg.in/yaml.v3 v3.0.1 ) require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang/protobuf v1.5.3 // indirect - github.com/google/uuid v1.3.1 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.19 // indirect github.com/pion/datachannel v1.5.5 // indirect github.com/pion/dtls/v2 v2.2.7 // indirect github.com/pion/ice/v2 v2.3.11 // indirect @@ -35,8 +40,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/testify v1.8.4 // indirect golang.org/x/crypto v0.12.0 // indirect - golang.org/x/sys v0.11.0 // indirect + golang.org/x/sys v0.12.0 // indirect golang.org/x/text v0.12.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index c95f59b..d54a676 100644 --- a/go.sum +++ b/go.sum @@ -2,12 +2,14 @@ connectrpc.com/connect v1.11.1 h1:dqRwblixqkVh+OFBOOL1yIf1jS/yP0MSJLijRj29bFg= connectrpc.com/connect v1.11.1/go.mod h1:3AGaO6RRGMx5IKFfqbe3hvK1NqLosFNP2BxDYTPmNPo= connectrpc.com/grpcreflect v1.2.0 h1:Q6og1S7HinmtbEuBvARLNwYmTbhEGRpHDhqrPNlmK+U= connectrpc.com/grpcreflect v1.2.0/go.mod h1:nwSOKmE8nU5u/CidgHtPYk1PFI3U9ignz7iDMxOYkSY= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gofrs/uuid/v5 v5.0.0 h1:p544++a97kEL+svbcFbCQVM9KFu0Yo25UoISXGNNH9M= github.com/gofrs/uuid/v5 v5.0.0/go.mod h1:CDOjlDMVAtN56jqyRUZh58JT31Tiw7/oQyEXZV+9bD8= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -35,6 +37,11 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -81,10 +88,14 @@ github.com/pion/turn/v2 v2.1.3 h1:pYxTVWG2gpC97opdRc5IGsQ1lJ9O/IlNhkzj7MMrGAA= github.com/pion/turn/v2 v2.1.3/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY= github.com/pion/webrtc/v3 v3.2.20 h1:BQJiXQsJq9LgLp3op7rLy1y8d2WD+LtiS9cpY0uQ22A= github.com/pion/webrtc/v3 v3.2.20/go.mod h1:vVURQTBOG5BpWKOJz3nlr23NfTDeyKVmubRNqzQp+Tg= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rs/cors v1.10.0 h1:62NOS1h+r8p1mW6FM0FSB0exioXLhd/sh15KpjWBZ+8= github.com/rs/cors v1.10.0/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= +github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= +github.com/rs/zerolog v1.31.0 h1:FcTR3NnLWW+NnTwwhFWiJSZr4ECLpqCm6QsEnyvbV4A= +github.com/rs/zerolog v1.31.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZKJ48R1S7H23Ji7oFO5Bw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= @@ -145,15 +156,18 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= diff --git a/internal/pipespy/pipespy.go b/internal/pipespy/pipespy.go new file mode 100644 index 0000000..faeb448 --- /dev/null +++ b/internal/pipespy/pipespy.go @@ -0,0 +1,159 @@ +// Package pipespy provides a structure which connects multiple things, forwarding data, but also +// calling a helper in the middle to spy. +package pipespy + +import ( + "errors" + "fmt" + "io" + "os/exec" + "sync" + + "github.com/rs/zerolog/log" +) + +type process interface { + // Sets the stdin. + SetStdin(io.Reader) + // Sets the stdout. + SetStdout(io.Writer) + + // Start the process; should block until the process completes, then free Stdin and Stdout. + Run() error +} + +type CmdWrap struct { + cmd *exec.Cmd +} + +func NewCmd(cmd *exec.Cmd) *CmdWrap { + return &CmdWrap{ + cmd: cmd, + } +} + +func (s *CmdWrap) SetStdin(r io.Reader) { + s.cmd.Stdin = r +} + +func (s *CmdWrap) SetStdout(w io.Writer) { + s.cmd.Stdout = w +} + +func (s *CmdWrap) Run() error { + if err := s.cmd.Start(); err != nil { + return fmt.Errorf("failed to start command: %w", err) + } + return s.cmd.Wait() +} + +type wrap struct { + proc process + + writeCloser io.WriteCloser +} + +func (s *wrap) SetStdin(r io.ReadCloser) { + s.proc.SetStdin(r) +} + +func (s *wrap) SetStdout(w io.WriteCloser) { + s.writeCloser = w + s.proc.SetStdout(w) +} + +func (s *wrap) Run() error { + defer s.writeCloser.Close() + return s.proc.Run() +} + +type Pipe struct { + processOrder []*Spy +} + +func New() *Pipe { + return &Pipe{} +} + +// Add a process to the pipeline. Use the returned spy object to snoop on it. +// Snoop must be called before Start; will panic otherwise. +func (p *Pipe) Add(proc process) *Spy { + w := &wrap{ + proc: proc, + } + piperReader, pipeWriter := io.Pipe() + w.SetStdout(pipeWriter) + spy := &Spy{ + stdIn: piperReader, + proc: w, + } + p.processOrder = append(p.processOrder, spy) + if l := len(p.processOrder) - 1; l > 0 { + p.processOrder[l].proc.SetStdin(p.processOrder[l-1].Snoop()) + } + return spy +} + +// Starts the pipeline; there is a Go routine created to ensure readers dont stall. +// Make sure to kill the processes, which will cause EOF to propegate. The returned +// function can be used to wait for all to cleanly exit. +func (p *Pipe) Start() func() []error { + errorCh := make(chan error, len(p.processOrder)) + wg := sync.WaitGroup{} + wg.Add(len(p.processOrder)) + for _, s := range p.processOrder { + s.running = true + go func(s *Spy) { + defer wg.Done() + buff := make([]byte, 128) + var err error + for n := 1; err == nil || n == 0; _, err = s.stdIn.Read(buff) { + // do nothing + } + if !errors.Is(err, io.EOF) { + log.Warn().Err(err).Msg("unexpected error from reader") + } + }(s) + go func(s *Spy) { + if err := s.proc.Run(); err != nil { + errorCh <- err + } + + // will this cause some writes to get lost? is that a problem? + for _, closer := range s.closers { + if err := closer(); err != nil { + log.Warn().Err(err).Msg("error closing pipe") + } + } + }(s) + } + return func() []error { + wg.Wait() + close(errorCh) + var errs []error + for err := range errorCh { + errs = append(errs, err) + } + return errs + + } +} + +type Spy struct { + stdIn io.Reader + closers []func() error + proc *wrap + running bool +} + +// Snoop on the Stdout of the process being spied; reader will recieve a copy of the bytes. +func (s *Spy) Snoop() io.ReadCloser { + if s.running { + panic("Call to snoop on a running process") + } + pipeReader, pipeWriter := io.Pipe() + s.closers = append(s.closers, pipeWriter.Close) + r := io.TeeReader(s.stdIn, pipeWriter) + s.stdIn = r + return pipeReader +} diff --git a/internal/pipespy/pipespy_test.go b/internal/pipespy/pipespy_test.go new file mode 100644 index 0000000..5506a38 --- /dev/null +++ b/internal/pipespy/pipespy_test.go @@ -0,0 +1,115 @@ +package pipespy_test + +import ( + "errors" + "fmt" + "io" + "sync" + "testing" + + "github.com/chathaway-codes/home-sensors/v2/internal/pipespy" + "github.com/google/go-cmp/cmp" +) + +func TestPipeSpy(t *testing.T) { + pipe := pipespy.New() + + readPipe, writerPipe := io.Pipe() + stage1 := &simpleWriter{t: t, stdIn: readPipe} + stage2 := &simpleWriter{t: t} + + stdOut1 := pipe.Add(stage1).Snoop() + stdOut2 := pipe.Add(stage2).Snoop() + pipe.Add(&simpleWriter{t: t}) + + // Spawn threads to keep reading the snoop + var buff1, buff2 []byte + var err1, err2 error + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + buff1, err1 = io.ReadAll(stdOut1) + if errors.Is(err1, io.EOF) { + err1 = nil + } + }() + go func() { + defer wg.Done() + buff2, err2 = io.ReadAll(stdOut2) + if errors.Is(err2, io.EOF) { + err2 = nil + } + }() + + wait := pipe.Start() + + msg := "This is a message!" + fmt.Fprint(writerPipe, msg) + if err := writerPipe.Close(); err != nil { + t.Fatalf("failed to close pipe: %v", err) + } + + wait() + wg.Wait() + + if err1 != nil || err2 != nil { + t.Errorf("Got errs %v and %v; want nil or EOF", err1, err2) + } + + if diff := cmp.Diff(string(buff1), msg); diff != "" { + t.Errorf("got diff:\n%s", diff) + } + if diff := cmp.Diff(string(buff2), msg); diff != "" { + t.Errorf("got diff:\n%s", diff) + } +} + +func TestPipeSpyError(t *testing.T) { + readPipe, writerPipe := io.Pipe() + pipe := pipespy.New() + pipe.Add(&simpleWriter{t: t, stdIn: readPipe}) + pipe.Add(&simpleWriter{t: t, err: fmt.Errorf("this is an error")}) + pipe.Add(&simpleWriter{t: t}) + + wait := pipe.Start() + if _, err := fmt.Fprintf(writerPipe, "Hello"); err != nil { + t.Fatalf("error: %v", err) + } + writerPipe.Close() + + errs := wait() + + if len(errs) != 1 { + t.Errorf("Expected 1 err; got %d", len(errs)) + } +} + +type simpleWriter struct { + stdIn io.Reader + stdOut io.Writer + gotData []byte + err error + t *testing.T +} + +func (s *simpleWriter) SetStdin(r io.Reader) { + s.stdIn = r +} + +func (s *simpleWriter) SetStdout(w io.Writer) { + s.stdOut = w +} + +func (s *simpleWriter) Run() error { + if s.err != nil { + return s.err + } + bytes, err := io.ReadAll(s.stdIn) + if err != nil && !errors.Is(err, io.EOF) { + s.t.Errorf("Unexpected err: %v", err) + } + s.gotData = bytes + s.stdOut.Write(bytes) + return nil +} diff --git a/internal/video/video.go b/internal/video/video.go new file mode 100644 index 0000000..1cf05a4 --- /dev/null +++ b/internal/video/video.go @@ -0,0 +1,170 @@ +// Package video implements logic to stream video from a config. +package video + +import ( + "bufio" + "context" + "errors" + "io" + "os/exec" + "sync" + + "github.com/chathaway-codes/home-sensors/v2/internal/pipespy" + "github.com/chathaway-codes/home-sensors/v2/internal/watcher/config" + "github.com/google/uuid" + "github.com/pion/webrtc/v3" + "github.com/pion/webrtc/v3/pkg/media/ivfreader" + "github.com/rs/zerolog/log" +) + +var Default = &Mod{} + +type Video struct { + mu sync.Mutex + h264Cmd, ivfCmd *exec.Cmd + ivfListeners map[string]chan<- []byte + ivfCodecReady chan struct{} + ivfCodec string + cancelFunc func() + ctx context.Context +} + +func New(cfg *config.Config) (*Video, error) { + ctx, cancelFunc := context.WithCancel(context.Background()) + // Setup commands + h264cmd := exec.CommandContext(ctx, cfg.H264Cmd.Binary, cfg.H264Cmd.Arguments...) + ivfCmd := exec.CommandContext(ctx, cfg.IVFCmd.Binary, cfg.IVFCmd.Arguments...) + + return &Video{ + h264Cmd: h264cmd, + ivfCmd: ivfCmd, + ivfListeners: make(map[string]chan<- []byte), + ivfCodecReady: make(chan struct{}), + cancelFunc: cancelFunc, + ctx: ctx, + }, nil +} + +// Run launches the commands and begins creating data. It will block until Done is called. +func (v *Video) Run() { + pipe := pipespy.New() + + pipe.Add(pipespy.NewCmd(v.h264Cmd)) + ivfSnoop := pipe.Add(pipespy.NewCmd(v.ivfCmd)).Snoop() + + log.Info().Str("cmd", v.h264Cmd.String()).Msg("h264 command") + log.Info().Str("cmd", v.ivfCmd.String()).Msg("ivf command") + + // Log stderr if it appears + go logToStdErr("h264", &v.h264Cmd.Stderr) + go logToStdErr("ivf", &v.ivfCmd.Stderr) + + cleanUp := pipe.Start() + defer func() { + errs := cleanUp() + for _, err := range errs { + log.Err(err).Send() + } + }() + + go func(r io.Reader) { + ivf, header, err := ivfreader.NewWith(r) + if err != nil { + log.Error().Err(err).Msg("failed to create ivfreader") + return + } + + // Determine video codec + var trackCodec string + switch header.FourCC { + case "AV01": + trackCodec = webrtc.MimeTypeAV1 + case "VP90": + trackCodec = webrtc.MimeTypeVP9 + case "VP80": + trackCodec = webrtc.MimeTypeVP8 + default: + log.Error().Err(err).Str("fourcc", header.FourCC).Msg("unable to handle FourCC") + } + + v.mu.Lock() + v.ivfCodec = trackCodec + close(v.ivfCodecReady) + v.mu.Unlock() + + log.Info().Msgf("starting to stream IVF with codec %q", trackCodec) + + for { + select { + case <-v.ctx.Done(): + // Exit cleanly + return + default: + // do nothing + } + frame, _, ivfErr := ivf.ParseNextFrame() + if errors.Is(ivfErr, io.EOF) { + log.Debug().Msg("all video frames parsed and sent") + return + } + + if ivfErr != nil { + log.Error().Err(err).Msg("failed to parse frame") + } + + v.mu.Lock() + for _, lis := range v.ivfListeners { + lis <- frame + } + v.mu.Unlock() + } + }(ivfSnoop) + log.Info().Msg("video streams started") + + <-v.ctx.Done() +} + +// Join will connect to a running stream; note, it will block +// until the ivf codec is decided. Make sure to call Run first. +func (v *Video) Join() (<-chan []byte, string, func()) { + <-v.ivfCodecReady + v.mu.Lock() + defer v.mu.Unlock() + + myID := uuid.New().String() + ch := make(chan []byte) + v.ivfListeners[myID] = ch + + return ch, v.ivfCodec, func() { + v.mu.Lock() + defer v.mu.Unlock() + + delete(v.ivfListeners, myID) + } +} + +func logToStdErr(name string, w *io.Writer) { + pipeReader, pipeWriter := io.Pipe() + *w = pipeWriter + + lineReader := bufio.NewScanner(pipeReader) + + for lineReader.Scan() { + log.Info().Str("video", name).Str("stderr", lineReader.Text()).Send() + } +} + +// Done stops the processing. +func (v *Video) Done() { + v.cancelFunc() +} + +type Mod struct{} + +func (m *Mod) Get() (*Video, error) { + cfg, err := config.Default.Get() + if err != nil { + return nil, err + } + return New(cfg) +} diff --git a/internal/watcher/config/config.go b/internal/watcher/config/config.go new file mode 100644 index 0000000..1c8e48d --- /dev/null +++ b/internal/watcher/config/config.go @@ -0,0 +1,57 @@ +// Package config handles loading the config. +package config + +import ( + "flag" + "fmt" + "os" + + "gopkg.in/yaml.v3" +) + +var Default *Mod + +type Cmd struct { + Binary string + Arguments []string +} + +type Config struct { + H264Cmd *Cmd `yaml:"h264"` + IVFCmd *Cmd `yaml:"ivf"` +} + +func New(source []byte) (*Config, error) { + config := &Config{} + if err := yaml.Unmarshal(source, config); err != nil { + return nil, fmt.Errorf("failed to parse config: %w", err) + } + return config, nil +} + +type Mod struct { + filePath string +} + +func (m *Mod) RegisterFlags(fs *flag.FlagSet) { + fs.StringVar(&m.filePath, "watcher_config", "", "path to the watcher configuration") +} + +func (m *Mod) Get() (*Config, error) { + bytes, err := os.ReadFile(m.filePath) + if err != nil { + return nil, fmt.Errorf("failed to read file %q: %w", m.filePath, err) + } + + config, err := New(bytes) + if err != nil { + return nil, err + } + + return config, nil +} + +func init() { + Default = &Mod{} + Default.RegisterFlags(flag.CommandLine) +} diff --git a/sample.yaml b/sample.yaml new file mode 100644 index 0000000..d5c8cc1 --- /dev/null +++ b/sample.yaml @@ -0,0 +1,16 @@ +h264: + binary: "/usr/bin/cat" + arguments: + - "/home/charles/Downloads/simpsons_movie_1080p_hddvd_trailer/The Simpsons Movie - 1080p Trailer.mp4" +ivf: + binary: "/usr/bin/ffmpeg" + arguments: + - "-i" + - "-" + - "-g" + - "30" + - "-b:v" + - "2M" + - "-f" + - "ivf" + - "-"