// Package h264video implements logic to stream video from a config. package h264video import ( "bufio" "context" "errors" "io" "os/exec" "sync" "time" "github.com/chathaway-codes/home-sensors/v2/internal/pipespy" "github.com/chathaway-codes/home-sensors/v2/internal/watcher/config" "github.com/google/uuid" "github.com/pion/webrtc/v3" "github.com/pion/webrtc/v4/pkg/media/h264reader" "github.com/rs/zerolog/log" ) var Default = &Mod{} type Video struct { mu sync.Mutex h264Cmd *exec.Cmd ivfListeners map[string]chan<- []byte ivfCodecReady chan struct{} cancelFunc func() ctx context.Context } func New(cfg *config.Config) (*Video, error) { ctx, cancelFunc := context.WithCancel(context.Background()) // Setup commands h264cmd := exec.CommandContext(ctx, cfg.H264Cmd.Binary, cfg.H264Cmd.Arguments...) return &Video{ h264Cmd: h264cmd, ivfListeners: make(map[string]chan<- []byte), ivfCodecReady: make(chan struct{}), cancelFunc: cancelFunc, ctx: ctx, }, nil } // Run launches the commands and begins creating data. It will block until Done is called. func (v *Video) Run() { pipe := pipespy.New() h264snoop := pipe.Add(pipespy.NewCmd(v.h264Cmd)).Snoop() log.Info().Str("cmd", v.h264Cmd.String()).Msg("h264 command") // Log stderr if it appears go logToStdErr("h264", &v.h264Cmd.Stderr) cleanUp := pipe.Start() defer func() { errs := cleanUp() for _, err := range errs { log.Err(err).Send() } }() go func(r io.Reader) { rdr, err := h264reader.NewReader(r) if err != nil { log.Error().Err(err).Msg("failed to create ivfreader") return } v.mu.Lock() close(v.ivfCodecReady) v.mu.Unlock() log.Info().Msgf("starting to stream H264") ticker := time.NewTicker(time.Millisecond * 33) defer ticker.Stop() for ; true; <-ticker.C { select { case <-v.ctx.Done(): // Exit cleanly return default: // do nothing } frame, ivfErr := rdr.NextNAL() if errors.Is(ivfErr, io.EOF) { log.Debug().Msg("all video frames parsed and sent") return } if ivfErr != nil { log.Error().Err(ivfErr).Msg("failed to parse frame") } if frame == nil { log.Debug().Msg("all video frames parsed and sent") return } v.mu.Lock() for _, lis := range v.ivfListeners { lis <- frame.Data } v.mu.Unlock() } }(h264snoop) log.Info().Msg("video streams started") <-v.ctx.Done() } // Join will connect to a running stream; note, it will block // until the ivf codec is decided. Make sure to call Run first. func (v *Video) Join() (<-chan []byte, string, func()) { <-v.ivfCodecReady v.mu.Lock() defer v.mu.Unlock() myID := uuid.New().String() ch := make(chan []byte) v.ivfListeners[myID] = ch return ch, webrtc.MimeTypeH264, func() { v.mu.Lock() defer v.mu.Unlock() delete(v.ivfListeners, myID) } } func logToStdErr(name string, w *io.Writer) { pipeReader, pipeWriter := io.Pipe() *w = pipeWriter lineReader := bufio.NewScanner(pipeReader) for lineReader.Scan() { log.Info().Str("video", name).Str("stderr", lineReader.Text()).Send() } } // Done stops the processing. func (v *Video) Done() { v.cancelFunc() } type Mod struct{} func (m *Mod) Get() (*Video, error) { cfg, err := config.Default.Get() if err != nil { return nil, err } return New(cfg) }