Skip to content

Commit 902af5e

Browse files
committed
Add wyoming module
1 parent 7fe23c7 commit 902af5e

File tree

12 files changed

+886
-6
lines changed

12 files changed

+886
-6
lines changed

internal/streams/play.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
"github.com/AlexxIT/go2rtc/pkg/core"
88
)
99

10-
func (s *Stream) Play(source string) error {
10+
func (s *Stream) Play(urlOrProd any) error {
1111
s.mu.Lock()
1212
for _, producer := range s.producers {
1313
if producer.state == stateInternal && producer.conn != nil {
@@ -16,12 +16,18 @@ func (s *Stream) Play(source string) error {
1616
}
1717
s.mu.Unlock()
1818

19-
if source == "" {
20-
return nil
21-
}
22-
19+
var source string
2320
var src core.Producer
2421

22+
switch urlOrProd.(type) {
23+
case string:
24+
if source = urlOrProd.(string); source == "" {
25+
return nil
26+
}
27+
case core.Producer:
28+
src = urlOrProd.(core.Producer)
29+
}
30+
2531
for _, producer := range s.producers {
2632
if producer.conn == nil {
2733
continue

internal/wyoming/wyoming.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package wyoming
2+
3+
import (
4+
"net"
5+
6+
"github.com/AlexxIT/go2rtc/internal/app"
7+
"github.com/AlexxIT/go2rtc/internal/streams"
8+
"github.com/AlexxIT/go2rtc/pkg/core"
9+
"github.com/AlexxIT/go2rtc/pkg/wyoming"
10+
"github.com/rs/zerolog"
11+
)
12+
13+
func Init() {
14+
streams.HandleFunc("wyoming", wyoming.Dial)
15+
16+
// server
17+
var cfg struct {
18+
Mod map[string]struct {
19+
Listen string `yaml:"listen"`
20+
Name string `yaml:"name"`
21+
WakeURI string `yaml:"wake_uri"`
22+
VADThreshold float32 `yaml:"vad_threshold"`
23+
} `yaml:"wyoming"`
24+
}
25+
app.LoadConfig(&cfg)
26+
27+
log = app.GetLogger("wyoming")
28+
29+
for name, cfg := range cfg.Mod {
30+
stream := streams.Get(name)
31+
if stream == nil {
32+
log.Warn().Msgf("[wyoming] missing stream: %s", name)
33+
continue
34+
}
35+
36+
ln, err := net.Listen("tcp", cfg.Listen)
37+
if err != nil {
38+
log.Warn().Msgf("[wyoming] listen error: %s", err)
39+
continue
40+
}
41+
42+
if cfg.Name == "" {
43+
cfg.Name = name
44+
}
45+
46+
srv := wyoming.Server{
47+
Name: cfg.Name,
48+
VADThreshold: int16(1000 * cfg.VADThreshold), // 1.0 => 1000
49+
WakeURI: cfg.WakeURI,
50+
MicHandler: func(cons core.Consumer) error {
51+
if err := stream.AddConsumer(cons); err != nil {
52+
return err
53+
}
54+
// not best solution
55+
if i, ok := cons.(interface{ OnClose(func()) }); ok {
56+
i.OnClose(func() {
57+
stream.RemoveConsumer(cons)
58+
})
59+
}
60+
return nil
61+
},
62+
SndHandler: func(prod core.Producer) error {
63+
return stream.Play(prod)
64+
},
65+
}
66+
go srv.Serve(ln)
67+
}
68+
}
69+
70+
var log zerolog.Logger

main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
"github.com/AlexxIT/go2rtc/internal/v4l2"
3939
"github.com/AlexxIT/go2rtc/internal/webrtc"
4040
"github.com/AlexxIT/go2rtc/internal/webtorrent"
41+
"github.com/AlexxIT/go2rtc/internal/wyoming"
4142
"github.com/AlexxIT/go2rtc/pkg/shell"
4243
)
4344

@@ -69,6 +70,7 @@ func main() {
6970
hass.Init() // hass source, Hass API server
7071
onvif.Init() // onvif source, ONVIF API server
7172
webtorrent.Init() // webtorrent source, WebTorrent module
73+
wyoming.Init()
7274

7375
// 5. Other sources
7476

pkg/core/codec.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ func ParseCodecString(s string) *Codec {
277277
codec.ClockRate = uint32(Atoi(ss[1]))
278278
}
279279
if len(ss) >= 3 {
280-
codec.Channels = uint16(Atoi(ss[1]))
280+
codec.Channels = uint8(Atoi(ss[1]))
281281
}
282282

283283
return &codec

pkg/pcm/pcm.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,3 +185,23 @@ func Transcode(dst, src *core.Codec) func([]byte) []byte {
185185
return writer(samples)
186186
}
187187
}
188+
189+
func ConsumerCodecs() []*core.Codec {
190+
return []*core.Codec{
191+
{Name: core.CodecPCML},
192+
{Name: core.CodecPCM},
193+
{Name: core.CodecPCMA},
194+
{Name: core.CodecPCMU},
195+
}
196+
}
197+
198+
func ProducerCodecs() []*core.Codec {
199+
return []*core.Codec{
200+
{Name: core.CodecPCML, ClockRate: 16000},
201+
{Name: core.CodecPCM, ClockRate: 16000},
202+
{Name: core.CodecPCML, ClockRate: 8000},
203+
{Name: core.CodecPCM, ClockRate: 8000},
204+
{Name: core.CodecPCMA, ClockRate: 8000},
205+
{Name: core.CodecPCMU, ClockRate: 8000},
206+
}
207+
}

pkg/pcm/s16le/s16le.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package s16le
2+
3+
func PeaksRMS(b []byte) int16 {
4+
// RMS of sine wave = peak / sqrt2
5+
// https://en.wikipedia.org/wiki/Root_mean_square
6+
// https://www.youtube.com/watch?v=MUDkL4KZi0I
7+
var peaks int32
8+
var peaksSum int32
9+
var prevSample int16
10+
var prevUp bool
11+
12+
var i int
13+
for n := len(b); i < n; {
14+
lo := b[i]
15+
i++
16+
hi := b[i]
17+
i++
18+
19+
sample := int16(hi)<<8 | int16(lo)
20+
up := sample >= prevSample
21+
22+
if i >= 4 {
23+
if up != prevUp {
24+
if prevSample >= 0 {
25+
peaksSum += int32(prevSample)
26+
} else {
27+
peaksSum -= int32(prevSample)
28+
}
29+
peaks++
30+
}
31+
}
32+
33+
prevSample = sample
34+
prevUp = up
35+
}
36+
37+
if peaks == 0 {
38+
return 0
39+
}
40+
41+
return int16(peaksSum / peaks)
42+
}

pkg/wyoming/api.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package wyoming
2+
3+
import (
4+
"bufio"
5+
"encoding/json"
6+
"io"
7+
"net"
8+
9+
"github.com/AlexxIT/go2rtc/pkg/core"
10+
)
11+
12+
type API struct {
13+
conn net.Conn
14+
rd *bufio.Reader
15+
}
16+
17+
func DialAPI(address string) (*API, error) {
18+
conn, err := net.DialTimeout("tcp", address, core.ConnDialTimeout)
19+
if err != nil {
20+
return nil, err
21+
}
22+
23+
return NewAPI(conn), nil
24+
}
25+
26+
const Version = "1.5.4"
27+
28+
func NewAPI(conn net.Conn) *API {
29+
return &API{conn: conn, rd: bufio.NewReader(conn)}
30+
}
31+
32+
func (w *API) WriteEvent(evt *Event) (err error) {
33+
hdr := EventHeader{
34+
Type: evt.Type,
35+
Version: Version,
36+
DataLength: len(evt.Data),
37+
PayloadLength: len(evt.Payload),
38+
}
39+
40+
buf, err := json.Marshal(hdr)
41+
if err != nil {
42+
return err
43+
}
44+
45+
buf = append(buf, '\n')
46+
buf = append(buf, evt.Data...)
47+
buf = append(buf, evt.Payload...)
48+
49+
_, err = w.conn.Write(buf)
50+
return err
51+
}
52+
53+
func (w *API) ReadEvent() (*Event, error) {
54+
data, err := w.rd.ReadBytes('\n')
55+
if err != nil {
56+
return nil, err
57+
}
58+
59+
var hdr EventHeader
60+
if err = json.Unmarshal(data, &hdr); err != nil {
61+
return nil, err
62+
}
63+
64+
evt := Event{Type: hdr.Type}
65+
66+
if hdr.DataLength > 0 {
67+
evt.Data = make([]byte, hdr.DataLength)
68+
if _, err = io.ReadFull(w.rd, evt.Data); err != nil {
69+
return nil, err
70+
}
71+
}
72+
73+
if hdr.PayloadLength > 0 {
74+
evt.Payload = make([]byte, hdr.PayloadLength)
75+
if _, err = io.ReadFull(w.rd, evt.Payload); err != nil {
76+
return nil, err
77+
}
78+
}
79+
80+
return &evt, nil
81+
}
82+
83+
func (w *API) Close() error {
84+
return w.conn.Close()
85+
}
86+
87+
type Event struct {
88+
Type string
89+
Data []byte
90+
Payload []byte
91+
}
92+
93+
type EventHeader struct {
94+
Type string `json:"type"`
95+
Version string `json:"version"`
96+
DataLength int `json:"data_length,omitempty"`
97+
PayloadLength int `json:"payload_length,omitempty"`
98+
}

pkg/wyoming/backchannel.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package wyoming
2+
3+
import (
4+
"fmt"
5+
"time"
6+
7+
"github.com/AlexxIT/go2rtc/pkg/core"
8+
"github.com/pion/rtp"
9+
)
10+
11+
type Backchannel struct {
12+
core.Connection
13+
api *API
14+
}
15+
16+
func (b *Backchannel) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
17+
return nil, core.ErrCantGetTrack
18+
}
19+
20+
func (b *Backchannel) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error {
21+
sender := core.NewSender(media, codec)
22+
sender.Handler = func(pkt *rtp.Packet) {
23+
ts := time.Now().Nanosecond()
24+
evt := &Event{
25+
Type: "audio-chunk",
26+
Data: []byte(fmt.Sprintf(`{"rate":16000,"width":2,"channels":1,"timestamp":%d}`, ts)),
27+
Payload: pkt.Payload,
28+
}
29+
_ = b.api.WriteEvent(evt)
30+
}
31+
sender.HandleRTP(track)
32+
b.Senders = append(b.Senders, sender)
33+
return nil
34+
}
35+
36+
func (b *Backchannel) Start() error {
37+
for {
38+
if _, err := b.api.ReadEvent(); err != nil {
39+
return err
40+
}
41+
}
42+
}

pkg/wyoming/producer.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package wyoming
2+
3+
import (
4+
"github.com/AlexxIT/go2rtc/pkg/core"
5+
"github.com/pion/rtp"
6+
)
7+
8+
type Producer struct {
9+
core.Connection
10+
api *API
11+
}
12+
13+
func (p *Producer) Start() error {
14+
var seq uint16
15+
var ts uint32
16+
17+
for {
18+
evt, err := p.api.ReadEvent()
19+
if err != nil {
20+
return err
21+
}
22+
23+
if evt.Type != "audio-chunk" {
24+
continue
25+
}
26+
27+
p.Recv += len(evt.Payload)
28+
29+
pkt := &core.Packet{
30+
Header: rtp.Header{
31+
Version: 2,
32+
Marker: true,
33+
SequenceNumber: seq,
34+
Timestamp: ts,
35+
},
36+
Payload: evt.Payload,
37+
}
38+
p.Receivers[0].WriteRTP(pkt)
39+
40+
seq++
41+
ts += uint32(len(evt.Payload) / 2)
42+
}
43+
}

0 commit comments

Comments
 (0)