package sensors import ( "bufio" "context" "os/exec" "strconv" "strings" "sync" "time" pb "github.com/chathaway-codes/home-sensors/v2/gen" "github.com/chathaway-codes/home-sensors/v2/internal/pipespy" "github.com/chathaway-codes/home-sensors/v2/internal/watcher/config" "github.com/google/uuid" "github.com/rs/zerolog/log" ) var Default = &Mod{} type Sensors struct { mu sync.Mutex ctx context.Context readCmd *exec.Cmd cancelFunc func() ticker time.Ticker listeners map[string]chan<- *pb.Sample } func New(cfg *config.Config) (*Sensors, error) { ctx, cancelFunc := context.WithCancel(context.Background()) readCmd := exec.CommandContext(ctx, cfg.SensorCmd.Binary, cfg.SensorCmd.Arguments...) ticker := time.NewTicker(time.Duration(cfg.SensorRateMS) * time.Millisecond) return &Sensors{ ctx: ctx, cancelFunc: cancelFunc, readCmd: readCmd, ticker: *ticker, // it would be better if sensors.Run handled sending to the server; to do that, // it needs a connection and the auth token. I don't have a clean way of getting that // right now. listeners: make(map[string]chan<- *pb.Sample), }, nil } func (v *Sensors) Run() { pipe := pipespy.New() snoop := pipe.Add(pipespy.NewCmd(v.readCmd)).Snoop() defer snoop.Close() cleanUp := pipe.Start() defer func() { errs := cleanUp() for _, err := range errs { log.Err(err).Send() } }() scanner := bufio.NewScanner(snoop) for scanner.Scan() { line := scanner.Text() parts := strings.Split(line, " ") if len(parts) != 3 { log.Error().Str("line", line).Msg("malformed line; expected 3 floats: humidity pressure temperature_c") } humidity, err := strconv.ParseFloat(parts[0], 64) if err != nil { log.Error().Err(err).Str("val", parts[0]).Msg("failed to parse humidity") } pressure, err := strconv.ParseFloat(parts[1], 64) if err != nil { log.Error().Err(err).Str("val", parts[1]).Msg("failed to parse pressure") } temperatureC, err := strconv.ParseFloat(parts[2], 64) if err != nil { log.Error().Err(err).Str("val", parts[2]).Msg("failed to parse temperature_c") } select { case <-v.ticker.C: func() { v.mu.Lock() defer v.mu.Unlock() samples := []*pb.Sample{ { Type: pb.Sample_HUMIDITY, Reading: humidity, }, { Type: pb.Sample_PRESSURE, Reading: pressure, }, { Type: pb.Sample_TEMPERATURE_C, Reading: temperatureC, }, } for _, listener := range v.listeners { for _, sample := range samples { listener <- sample } } }() default: // do nothing } } } // Join will connect to a running stream. func (v *Sensors) Join() (<-chan *pb.Sample, func()) { v.mu.Lock() defer v.mu.Unlock() myID := uuid.New().String() ch := make(chan *pb.Sample) v.listeners[myID] = ch return ch, func() { v.mu.Lock() defer v.mu.Unlock() delete(v.listeners, myID) } } // Done stops the processing. func (v *Sensors) Done() { v.cancelFunc() } type Mod struct{} func (m *Mod) Get() (*Sensors, error) { cfg, err := config.Default.Get() if err != nil { return nil, err } return New(cfg) }