diff --git a/cmd/watcher/watcher.go b/cmd/watcher/watcher.go index 5bca272..4483310 100644 --- a/cmd/watcher/watcher.go +++ b/cmd/watcher/watcher.go @@ -8,6 +8,7 @@ import ( "flag" "fmt" "net/http" + "sync" "time" "connectrpc.com/connect" @@ -19,6 +20,8 @@ import ( "github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3/pkg/media" "github.com/rs/zerolog/log" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" ) @@ -46,18 +49,6 @@ func main() { fmt.Sprintf("https://%s/", *signalerServer), connect.WithGRPC(), ) - authToken, err := client.CreateAuthToken(ctx, connect.NewRequest(&pb.CreateAuthTokenRequest{ - Home: cfg.HomeName, - Type: &pb.CreateAuthTokenRequest_Camera_{ - Camera: &pb.CreateAuthTokenRequest_Camera{ - Id: cfg.CameraName, - }, - }, - })) - if err != nil { - log.Fatal().Err(err).Msg("failed to get auth token") - } - token := authToken.Msg.GetToken() vid, err := h264video.Default.Get() if err != nil { @@ -78,6 +69,11 @@ func main() { sensorCh, sensorDone := sensors.Join() defer sensorDone() + token, err := getAuthToken(ctx, client, cfg) + if err != nil { + log.Fatal().Err(err).Msg("failed to get auth token") + } + go handleSensor(ctx, client, token, sensorCh) // Create a new RTCPeerConnection @@ -87,6 +83,14 @@ func main() { // Wait for a session request session, err := client.PopSession(ctx, withAuth(token, &pb.PopSessionRequest{})) if err != nil { + if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound { + // try getting a new token + token, err = getAuthToken(ctx, client, cfg) + if err != nil { + log.Fatal().Err(err).Msg("failed to recreate auth token") + } + continue + } log.Error().Err(err).Msg("error creating session") continue } @@ -94,6 +98,22 @@ func main() { } } +func getAuthToken(ctx context.Context, client servicepb.SignalerServiceClient, cfg *config.Config) (string, error) { + authToken, err := client.CreateAuthToken(ctx, connect.NewRequest(&pb.CreateAuthTokenRequest{ + Home: cfg.HomeName, + Type: &pb.CreateAuthTokenRequest_Camera_{ + Camera: &pb.CreateAuthTokenRequest_Camera{ + Id: cfg.CameraName, + }, + }, + })) + if err != nil { + return "", fmt.Errorf("CreateAuthToken failed: %w", err) + } + token := authToken.Msg.GetToken() + return token, nil +} + func handleSensor(ctx context.Context, client servicepb.SignalerServiceClient, token string, ch <-chan *pb.Sample) { for { var sample *pb.Sample @@ -111,76 +131,12 @@ func handleSensor(ctx context.Context, client servicepb.SignalerServiceClient, t } } -func handleSession(ctx context.Context, client servicepb.SignalerServiceClient, token string, session *connect.Response[pb.Session], vid *h264video.Video) { - var err error - log.Debug().Msg("new session") - - peerConnection, err := webrtc.NewPeerConnection(webrtc.Configuration{ - ICEServers: []webrtc.ICEServer{ - { - URLs: []string{"stun:stun.l.google.com:19302"}, - }, - }, - }) - if err != nil { - log.Info().Err(err).Msg("failed to get a connection") - return - } - - // connect to the video stream; the cleanup is done in the goroutine which - // consumes the framess - ch, trackCodec, cleanUp := vid.Join() - // Create a video track - videoTrack, videoTrackErr := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: trackCodec}, "video", "pion") - if videoTrackErr != nil { - log.Info().Err(videoTrackErr).Msg("Failed to create video track") - return - } - - rtpSender, err := peerConnection.AddTrack(videoTrack) - if err != nil { - log.Info().Err(err).Msg("Failed to add track to connection") - } - +func getIceConnection(ctx context.Context, peerConnection *webrtc.PeerConnection, client servicepb.SignalerServiceClient, token string, session *connect.Response[pb.Session]) (context.Context, error) { // We use the cancel func to signal that the stream is ready iceConnectedCtx, iceConnectedCtxCancel := context.WithCancel(context.Background()) - defer func() { - if err := peerConnection.Close(); err != nil { - log.Debug().Err(err).Msg("cannot close peerConnection") - } - }() - - // Read incoming RTCP packets - // Before these packets are returned they are processed by interceptors. For things - // like NACK this needs to be called. - go func() { - rtcpBuf := make([]byte, 1500) - for { - if _, _, err := rtpSender.Read(rtcpBuf); err != nil { - return - } - } - }() - - go func() { - defer cleanUp() - readyToSend := false - for frame := range ch { - select { - case <-iceConnectedCtx.Done(): - readyToSend = true - default: - // do nothing - } - if !readyToSend { - continue - } - if err := videoTrack.WriteSample(media.Sample{Data: frame, Duration: time.Millisecond * 33}); err != nil { - log.Err(err).Msg("failed to write sample") - return - } - } - }() + iceDisconnectedCtx, iceDeconnectedCtxCancel := context.WithCancel(ctx) + readyToSend := sync.WaitGroup{} + readyToSend.Add(1) // Set the handler for ICE connection state // This will notify you when the peer has connected/disconnected @@ -193,7 +149,6 @@ func handleSession(ctx context.Context, client servicepb.SignalerServiceClient, // Set the handler for Peer connection state // This will notify you when the peer has connected/disconnected - exitCh := make(chan struct{}) peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) { log.Debug().Msgf("Peer Connection State has changed: %s\n", s.String()) @@ -201,15 +156,12 @@ func handleSession(ctx context.Context, client servicepb.SignalerServiceClient, // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart. // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout. // Note that the PeerConnection may come back from PeerConnectionStateDisconnected. - close(exitCh) - return - } - if s == webrtc.PeerConnectionStateDisconnected { - close(exitCh) + iceDeconnectedCtxCancel() } }) peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) { + readyToSend.Wait() if i == nil { if _, err := client.CreateIceMessage(ctx, withAuth(token, &pb.CreateIceMessageRequest{ SessionIdentifier: session.Msg.GetId(), @@ -226,6 +178,7 @@ func handleSession(ctx context.Context, client servicepb.SignalerServiceClient, if c.UsernameFragment != nil { usernameFragment = proto.String(*c.UsernameFragment) } + log.Info().Msgf("got candidate %+v", c) client.CreateIceMessage(ctx, withAuth(token, &pb.CreateIceMessageRequest{ SessionIdentifier: session.Msg.GetId(), IceMessage: &pb.IceMessage{ @@ -240,85 +193,203 @@ func handleSession(ctx context.Context, client servicepb.SignalerServiceClient, }, })) }) - log.Info().Msg("Spawning helper") - // helper which sends answers, waits for + // Get the offer from the other + msg, err := client.PopIceMessage(iceDisconnectedCtx, withAuth(token, &pb.PopIceMessageRequest{ + SessionIdentifier: session.Msg.GetId(), + })) + if err != nil { + log.Info().Err(err).Msg("failed to pop ice message") + } + iceSession := msg.Msg.GetSession() + + switch iceSession.SdpType { + case int64(webrtc.SDPTypeOffer): + offer := webrtc.SessionDescription{ + Type: webrtc.SDPType(iceSession.SdpType), + SDP: iceSession.Sdp, + } + + if err := peerConnection.SetRemoteDescription(offer); err != nil { + log.Warn().Err(err).Msg("failed to set remote description") + } + default: + log.Info().Msgf("unexpected sdp type: %v", webrtc.SDPType(iceSession.SdpType).String()) + } + log.Info().Msg("Accepted promise!") + + // Send back an answer + answer, err := peerConnection.CreateAnswer(nil) + if err != nil { + log.Debug().Err(err).Msg("Candidate failed") + } + + if err := peerConnection.SetLocalDescription(answer); err != nil { + log.Info().Err(err).Msg("Failed to set local description") + } + + _, err = client.CreateIceMessage(iceDisconnectedCtx, withAuth(token, &pb.CreateIceMessageRequest{ + SessionIdentifier: session.Msg.GetId(), + IceMessage: &pb.IceMessage{ + Type: &pb.IceMessage_Session{ + Session: &pb.IceSessionDescription{ + SdpType: int64(answer.Type), + Sdp: answer.SDP, + }, + }, + }, + })) + if err != nil { + log.Info().Err(err).Msg("Failed to send answer") + } + readyToSend.Done() + + // TODO: do we add the video right here? + + // Go into a loop processing ice candidates // Add ICE candidates from remote - for { - select { - case <-exitCh: - return - default: - // check for another message - } - msg, err := client.PopIceMessage(ctx, withAuth(token, &pb.PopIceMessageRequest{ - SessionIdentifier: session.Msg.GetId(), - })) - if err != nil { - log.Info().Err(err).Msg("failed to pop ice message") - continue - } - switch msg.Msg.Type.(type) { - case *pb.IceMessage_Candidate: - candidate := msg.Msg.GetCandidate() - var sdpMLine *uint16 - if candidate.SdpLineIndex != nil { - t := uint16(candidate.GetSdpLineIndex()) - sdpMLine = &t + go func() { + for { + select { + case <-iceDisconnectedCtx.Done(): + return + default: + // check for another message } - if err := peerConnection.AddICECandidate(webrtc.ICECandidateInit{ - Candidate: candidate.GetCandidate(), - SDPMid: candidate.SdpMid, - SDPMLineIndex: sdpMLine, - }); err != nil { - log.Warn().Err(err).Msg("failed to add ice candidate") - } - - // Send back an answer - answer, err := peerConnection.CreateAnswer(nil) - if err != nil { - log.Debug().Msg("Candidate failed") - continue - } - - if err := peerConnection.SetLocalDescription(answer); err != nil { - log.Info().Err(err).Msg("Failed to set local description") - } - - _, err = client.CreateIceMessage(ctx, withAuth(token, &pb.CreateIceMessageRequest{ + msg, err := client.PopIceMessage(iceDisconnectedCtx, withAuth(token, &pb.PopIceMessageRequest{ SessionIdentifier: session.Msg.GetId(), - IceMessage: &pb.IceMessage{ - Type: &pb.IceMessage_Session{ - Session: &pb.IceSessionDescription{ - SdpType: int64(answer.Type), - Sdp: answer.SDP, - }, - }, - }, })) if err != nil { - log.Info().Err(err).Msg("Failed to send answer") + log.Info().Err(err).Msg("failed to pop ice message") + continue } - case *pb.IceMessage_Session: - iceSession := msg.Msg.GetSession() - - switch iceSession.SdpType { - case int64(webrtc.SDPTypeOffer): - offer := webrtc.SessionDescription{ - Type: webrtc.SDPType(iceSession.SdpType), - SDP: iceSession.Sdp, + switch msg.Msg.Type.(type) { + case *pb.IceMessage_Candidate: + candidate := msg.Msg.GetCandidate() + var sdpMLine *uint16 + if candidate.SdpLineIndex != nil { + t := uint16(candidate.GetSdpLineIndex()) + sdpMLine = &t } - - if err := peerConnection.SetRemoteDescription(offer); err != nil { - log.Warn().Err(err).Msg("failed to set remote description") + if err := peerConnection.AddICECandidate(webrtc.ICECandidateInit{ + Candidate: candidate.GetCandidate(), + SDPMid: candidate.SdpMid, + SDPMLineIndex: sdpMLine, + }); err != nil { + log.Warn().Err(err).Msg("failed to add ice candidate") } - default: - log.Info().Msgf("unexpected sdp type: %v", webrtc.SDPType(iceSession.SdpType).String()) + case *pb.IceMessage_Session: + + case *pb.IceMessage_NoMoreCandidates: + // do nothing } - log.Info().Msg("Accepted promise!") - case *pb.IceMessage_NoMoreCandidates: - // do nothing } + }() + + select { + case <-iceConnectedCtx.Done(): + // Success; return the connection + return iceDisconnectedCtx, nil + case <-iceDisconnectedCtx.Done(): + // No connection; return error + return ctx, fmt.Errorf("failed to create connection: ICE disconnected") } } + +func handleSession(ctx context.Context, client servicepb.SignalerServiceClient, token string, session *connect.Response[pb.Session], vid *h264video.Video) { + log.Debug().Msg("new session") + + // connect to the video stream; the cleanup is done in the goroutine which + // consumes the framess + ch, trackCodec, cleanUp := vid.Join() + defer cleanUp() + + // Create a video track + videoTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: trackCodec}, "video", "pion") + if err != nil { + log.Info().Err(err).Msg("Failed to create video track") + return + } + + peerConnection, err := webrtc.NewPeerConnection(webrtc.Configuration{ + ICEServers: []webrtc.ICEServer{ + { + URLs: []string{"stun:stun.l.google.com:19302"}, + }, + }, + }) + if err != nil { + log.Err(err).Msg("failed to make connection") + } + + rtpSender, err := peerConnection.AddTrack(videoTrack) + if err != nil { + log.Info().Err(err).Msg("Failed to add track to connection") + return + } + + disconnectedCtx, err := getIceConnection(ctx, peerConnection, client, token, session) + if err != nil { + log.Err(err).Msg("failed creating ice connection") + } + + log.Info().Msgf("State is %+v", rtpSender.Transport().State()) + + // Read incoming RTCP packets + // Before these packets are returned they are processed by interceptors. For things + // like NACK this needs to be called. + go func() { + rtcpBuf := make([]byte, 1500) + for { + select { + case <-disconnectedCtx.Done(): + return + default: + // do nothing + } + if _, _, err := rtpSender.Read(rtcpBuf); err != nil { + return + } + } + }() + + // Start a routine to send frames from a buffer + var frame []byte + var frameLock sync.Mutex + go func() { + ticker := time.NewTicker(time.Millisecond * 30) + for { + select { + case <-disconnectedCtx.Done(): + return + case <-ticker.C: + // do nothing + } + frameLock.Lock() + myFrame := frame + frame = nil + frameLock.Unlock() + if myFrame == nil { + continue + } + if err := videoTrack.WriteSample(media.Sample{Data: myFrame, Duration: time.Millisecond * 33}); err != nil { + log.Err(err).Msg("failed to write sample") + return + } + } + }() + + for myFrame := range ch { + select { + case <-disconnectedCtx.Done(): + return + default: + // do nothing + } + frameLock.Lock() + frame = myFrame + frameLock.Unlock() + } + // TODO: Video ended; close the connection +} diff --git a/dbuild/DEBIAN/control b/dbuild/DEBIAN/control new file mode 100644 index 0000000..d97beb7 --- /dev/null +++ b/dbuild/DEBIAN/control @@ -0,0 +1,5 @@ +Package: watcher +Version: 0.2 +Maintainer: Charles +Architecture: all +Description: Watches cameras and temp sensors diff --git a/dbuild/DEBIAN/postinst b/dbuild/DEBIAN/postinst new file mode 100755 index 0000000..24ca3ef --- /dev/null +++ b/dbuild/DEBIAN/postinst @@ -0,0 +1,7 @@ +#!/bin/bash + +systemctl daemon-reload +# Make sure its enabled +systemctl enable --now watcher +# Restart it; it might have already been installed and running +systemctl restart watcher diff --git a/dbuild/etc/systemd/system/watcher.service b/dbuild/etc/systemd/system/watcher.service new file mode 100644 index 0000000..b36abc9 --- /dev/null +++ b/dbuild/etc/systemd/system/watcher.service @@ -0,0 +1,14 @@ +# Install to /etc/systemd/system/watcher.service +[Unit] +Description=Run watcher +After=network-online.target +Wants=network-online.target systemd-networkd-wait-online.service + +[Service] +Type=exec +ExecStart=/usr/local/bin/watcher --watcher_config /etc/watcher_config.yaml --watcher_name /etc/watcher_name.txt +Restart=on-failure +RestartSec=5s + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/dbuild/etc/systemd/system/watcher.systemd b/dbuild/etc/systemd/system/watcher.systemd new file mode 100644 index 0000000..5a2015a --- /dev/null +++ b/dbuild/etc/systemd/system/watcher.systemd @@ -0,0 +1,14 @@ +# Install to /etc/systemd/system/watcher.service +[Unit] +Description=Run watcher +After=network-online.target +Wants=network-online.target systemd-networkd-wait-online.service + +[Service] +Type=exec +ExecStart=/usr/local/bin --watcher_config /etc/watcher.yaml +Restart=on-failure +RestartSec=5s + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/dbuild/etc/watcher_config.yaml b/dbuild/etc/watcher_config.yaml new file mode 100644 index 0000000..4eadb6a --- /dev/null +++ b/dbuild/etc/watcher_config.yaml @@ -0,0 +1,20 @@ +home: Sunnyvale +name: Office +h264: + binary: "/usr/bin/libcamera-vid" + arguments: + - "-n" + - "-t" + - "0" + - "--codec" + - "h264" + - "--mode" + - "1640:1232" + - "--inline" + - "-o" + - "-" +sensor: + binary: "/usr/bin/python3" + arguments: + - "/usr/local/bin/temperature.py" +sensor_rate_ms: 10000 \ No newline at end of file diff --git a/dbuild/usr/local/bin/temperature.py b/dbuild/usr/local/bin/temperature.py new file mode 100644 index 0000000..27fa4ba --- /dev/null +++ b/dbuild/usr/local/bin/temperature.py @@ -0,0 +1,17 @@ +import bme280 +import smbus2 +from time import sleep + +port = 1 +address = 0x77 # Adafruit BME280 address. Other BME280s may be different +bus = smbus2.SMBus(port) + +bme280.load_calibration_params(bus,address) + +while True: + bme280_data = bme280.sample(bus,address) + humidity = bme280_data.humidity + pressure = bme280_data.pressure + ambient_temperature = bme280_data.temperature + print(humidity, pressure, ambient_temperature) + sleep(1) diff --git a/dbuild/usr/local/bin/watcher b/dbuild/usr/local/bin/watcher new file mode 100755 index 0000000..c0b3775 Binary files /dev/null and b/dbuild/usr/local/bin/watcher differ diff --git a/internal/watcher/config/config.go b/internal/watcher/config/config.go index 95bfedd..74f8284 100644 --- a/internal/watcher/config/config.go +++ b/internal/watcher/config/config.go @@ -25,20 +25,23 @@ type Config struct { SensorRateMS int64 `yaml:"sensor_rate_ms"` } -func New(source []byte) (*Config, error) { +func New(source []byte, name string) (*Config, error) { config := &Config{} if err := yaml.Unmarshal(source, config); err != nil { return nil, fmt.Errorf("failed to parse config: %w", err) } + config.CameraName = name return config, nil } type Mod struct { filePath string + namePath string } func (m *Mod) RegisterFlags(fs *flag.FlagSet) { fs.StringVar(&m.filePath, "watcher_config", "", "path to the watcher configuration") + fs.StringVar(&m.namePath, "watcher_name", "/var/lib/dbus/machine-id", "location of the file to pull name from") } func (m *Mod) Get() (*Config, error) { @@ -47,7 +50,13 @@ func (m *Mod) Get() (*Config, error) { return nil, fmt.Errorf("failed to read file %q: %w", m.filePath, err) } - config, err := New(bytes) + // Override name using a unique ID; we can let users name things in the app + myID, err := os.ReadFile(m.namePath) + if err != nil { + return nil, fmt.Errorf("failed to read machine ID from %q", m.namePath) + } + + config, err := New(bytes, string(myID)) if err != nil { return nil, err } diff --git a/pkg/signaler/signaler.go b/pkg/signaler/signaler.go index 9ff7550..e1f70d2 100644 --- a/pkg/signaler/signaler.go +++ b/pkg/signaler/signaler.go @@ -223,7 +223,16 @@ func (s *Server) PopSession(ctx context.Context, request *connect.Request[pb.Pop ch := s.sessionsByCamera[authToken.Uid] s.mu.Unlock() - sess := <-ch + var sess *session + tick := time.NewTicker(time.Second * 30) + defer tick.Stop() + select { + case sess = <-ch: + // OK + case <-tick.C: + // have them retry + return nil, connect.NewError(connect.CodeDeadlineExceeded, fmt.Errorf("try again")) + } if sess == nil { return nil, status.Errorf(codes.DataLoss, "someone else stole the session") diff --git a/prod.yaml b/prod.yaml index 74a30b3..9850287 100644 --- a/prod.yaml +++ b/prod.yaml @@ -17,7 +17,7 @@ spec: spec: containers: - name: signaler - image: us-central1-docker.pkg.dev/home-sensors-400805/signaler/image:20231003-0032 + image: us-central1-docker.pkg.dev/home-sensors-400805/signaler/image:20240110-2245 command: - /signaler ports: @@ -81,3 +81,10 @@ spec: domains: - home.chathaway.codes - www.home.chathaway.codes +--- +apiVersion: cloud.google.com/v1 +kind: BackendConfig +metadata: + name: my-bsc-backendconfig +spec: + timeoutSec: 40 \ No newline at end of file diff --git a/rpi_camera.yaml b/rpi_camera.yaml index c93b0a8..4eadb6a 100644 --- a/rpi_camera.yaml +++ b/rpi_camera.yaml @@ -10,25 +10,11 @@ h264: - "h264" - "--mode" - "1640:1232" - - "--denoise" - - "off" - "--inline" - "-o" - "-" -ivf: - binary: "/usr/bin/ffmpeg" - arguments: - - "-i" - - "-" - - "-g" - - "30" - - "-b:v" - - "2M" - - "-f" - - "ivf" - - "-" sensor: binary: "/usr/bin/python3" arguments: - - "/home/charles/temperature.py" + - "/usr/local/bin/temperature.py" sensor_rate_ms: 10000 \ No newline at end of file diff --git a/scripts/build_deb.sh b/scripts/build_deb.sh new file mode 100755 index 0000000..f5991a5 --- /dev/null +++ b/scripts/build_deb.sh @@ -0,0 +1,34 @@ +#!/bin/bash + +mkdir -p dbuild/usr/local/bin +mkdir -p dbuild/etc/ +mkdir -p dbuild/etc/systemd/system/ +mkdir -p dbuild/DEBIAN + +cat < dbuild/DEBIAN/control +Package: watcher +Version: 0.2 +Maintainer: Charles +Architecture: all +Description: Watches cameras and temp sensors +EOF + +cat < dbuild/DEBIAN/postinst +#!/bin/bash + +systemctl daemon-reload +# Make sure its enabled +systemctl enable --now watcher +# Restart it; it might have already been installed and running +systemctl restart watcher +EOF +chmod +x dbuild/DEBIAN/postinst + +GOOS=linux GOARCH=arm64 go build -o dbuild/usr/local/bin/watcher ./cmd/watcher + +cp ./rpi_camera.yaml dbuild/etc/watcher_config.yaml +cp ./watcher.systemd dbuild/etc/systemd/system/watcher.service +cp ./temperature.py dbuild/usr/local/bin/ + + +dpkg-deb --build dbuild \ No newline at end of file diff --git a/ui/android/app/src/main/AndroidManifest.xml b/ui/android/app/src/main/AndroidManifest.xml index 9909624..2c24879 100644 --- a/ui/android/app/src/main/AndroidManifest.xml +++ b/ui/android/app/src/main/AndroidManifest.xml @@ -31,5 +31,6 @@ android:value="2" /> + diff --git a/ui/lib/call.dart b/ui/lib/call.dart index c9efa40..d469fc6 100644 --- a/ui/lib/call.dart +++ b/ui/lib/call.dart @@ -38,15 +38,16 @@ class CallState extends State { logger.i("Init remote renderer"); await _remoteRenderer.initialize(); logger.i("Creating session"); - await _createSesson(); + await _createSession(); } - _createSesson() async { + _createSession() async { var callOptions = CallOptions(metadata: { 'Authorization': await widget.sessionService.getAuthToken(widget.home) }); var cancelCreate = Completer(); + var sendIceCandidates = Completer(); var clientSession = await widget.client.createSession( pb.CreateSessionRequest( @@ -73,6 +74,8 @@ class CallState extends State { }; peerConnection.onIceCandidate = (candidate) async { + await sendIceCandidates.future; + logger.i("Sending ICE candidate"); if (candidate.candidate == null) { await widget.client.createIceMessage( CreateIceMessageRequest( @@ -99,16 +102,15 @@ class CallState extends State { }; peerConnection.onIceConnectionState = (state) { - statusLine = "Ice state now $state"; + statusLine = "$state"; setState(() {}); - logger.i("Ice state now $state"); switch (state) { - case RTCIceConnectionState.RTCIceConnectionStateClosed: - case RTCIceConnectionState.RTCIceConnectionStateDisconnected: + //case RTCIceConnectionState.RTCIceConnectionStateClosed: + //case RTCIceConnectionState.RTCIceConnectionStateDisconnected: case RTCIceConnectionState.RTCIceConnectionStateFailed: cancelCreate.complete(CallCancelled()); - _connect(); + //_connect(); default: // do nothing } @@ -136,7 +138,6 @@ class CallState extends State { var offer = await peerConnection.createOffer(); await peerConnection.setLocalDescription(offer); // Send offer through signaling server - logger.i("Offer is $offer"); await widget.client.createIceMessage( pb.CreateIceMessageRequest( sessionIdentifier: clientSession.id, @@ -149,6 +150,20 @@ class CallState extends State { ), options: callOptions); + // Expect back a response + var someResponse = await Future.any([ + widget.client.popIceMessage( + pb.PopIceMessageRequest(sessionIdentifier: clientSession.id), + options: callOptions), + cancelCreate.future, + ]); + var resp = someResponse as pb.IceMessage; + var session = resp.session; + await peerConnection + .setRemoteDescription(RTCSessionDescription(session.sdp, "answer")); + + sendIceCandidates.complete(); + // Get candidates from remote while (true) { var someResponse = await Future.any([ @@ -168,10 +183,6 @@ class CallState extends State { resp.candidate.sdpLineIndex)); } else if (resp.hasNoMoreCandidates()) { logger.i("No more candidates from remote"); - } else if (resp.hasSession()) { - var session = resp.session; - await peerConnection - .setRemoteDescription(RTCSessionDescription(session.sdp, "answer")); break; } } @@ -183,7 +194,7 @@ class CallState extends State { Text(widget.cameraID.id), Text(statusLine), SizedBox( - height: 480, + height: 320, child: _ready ? RTCVideoView(_remoteRenderer) : const Text("Loading...")), diff --git a/ui/lib/main.dart b/ui/lib/main.dart index ef3e5b6..a3f3f7f 100644 --- a/ui/lib/main.dart +++ b/ui/lib/main.dart @@ -1,3 +1,5 @@ +import 'dart:collection'; + import 'package:flutter/material.dart'; //import 'package:grpc/grpc_web.dart'; import 'package:grpc/grpc.dart'; @@ -36,9 +38,9 @@ class MyApp extends StatelessWidget { @override Widget build(BuildContext context) { return MaterialApp( - title: 'Flutter Demo', - theme: ThemeData( - colorScheme: ColorScheme.fromSeed(seedColor: Colors.deepPurple), + title: 'Home Sensors', + theme: ThemeData.dark( + // colorScheme: ColorScheme.fromSeed(seedColor: Colors.black87), useMaterial3: true, ), home: MyHomePage( @@ -75,8 +77,8 @@ class MyHomePage extends StatefulWidget { class _MyHomePageState extends State { String topMessage = "Creating session..."; - List camerasToRender = []; - List samples = []; + List camerasToRender = []; + Map cameraSamples = {}; @override void initState() { @@ -100,8 +102,11 @@ class _MyHomePageState extends State { .listSamples(ListSamplesRequest(), options: callOptions); for (var sample in resp.samples) { - samples - .add(Text("${sample.type}: ${sample.reading} on ${sample.cameraId}")); + if (sample.type == Sample_Type.TEMPERATURE_C) { + var reading = (sample.reading * 9.0 / 5.0) + 32; + cameraSamples[sample.cameraId.id] = + Text("${reading.toStringAsFixed(2)} f"); + } } setState(() {}); } @@ -113,12 +118,22 @@ class _MyHomePageState extends State { var cameras = await widget.client .listCameras(ListCamerasRequest(), options: callOptions); + cameras.cameras.sort((a, b) => a.identifier.id.compareTo(b.identifier.id)); + camerasToRender = []; for (var camera in cameras.cameras) { - camerasToRender.add(Call( - widget.client, - widget.sessionService, - cameraID: camera.identifier, - home: widget.home, + List children = [ + Call( + widget.client, + widget.sessionService, + cameraID: camera.identifier, + home: widget.home, + ), + ]; + if (cameraSamples.containsKey(camera.identifier.id)) { + children.add(cameraSamples[camera.identifier.id]!); + } + camerasToRender.add(Column( + children: children, )); } setState(() {}); @@ -134,12 +149,7 @@ class _MyHomePageState extends State { // than having to individually change instances of widgets. return Scaffold( appBar: AppBar( - // TRY THIS: Try changing the color here to a specific color (to - // Colors.amber, perhaps?) and trigger a hot reload to see the AppBar - // change color while the other colors stay the same. backgroundColor: Theme.of(context).colorScheme.inversePrimary, - // Here we take the value from the MyHomePage object that was created by - // the App.build method, and use it to set our appbar title. title: Text(widget.title), ), body: SingleChildScrollView( @@ -150,7 +160,6 @@ class _MyHomePageState extends State { children: [ Text(topMessage), ] + - samples + camerasToRender, ), )); diff --git a/watcher.systemd b/watcher.systemd new file mode 100644 index 0000000..b36abc9 --- /dev/null +++ b/watcher.systemd @@ -0,0 +1,14 @@ +# Install to /etc/systemd/system/watcher.service +[Unit] +Description=Run watcher +After=network-online.target +Wants=network-online.target systemd-networkd-wait-online.service + +[Service] +Type=exec +ExecStart=/usr/local/bin/watcher --watcher_config /etc/watcher_config.yaml --watcher_name /etc/watcher_name.txt +Restart=on-failure +RestartSec=5s + +[Install] +WantedBy=multi-user.target \ No newline at end of file