Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: yutopp/go-rtmp
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v0.0.2
Choose a base ref
...
head repository: yutopp/go-rtmp
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: v0.0.3
Choose a head ref
  • 3 commits
  • 3 files changed
  • 2 contributors

Commits on Sep 28, 2022

  1. Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature.
    Copy the full SHA
    b76a40b View commit details

Commits on Sep 29, 2022

  1. Verified

    This commit was signed with the committer’s verified signature.
    darinpope Darin Pope
    Copy the full SHA
    3e0521f View commit details

Commits on Jan 1, 2023

  1. Merge pull request #49 from diallo-han/master

    Fix bug writing same timestamp with different messageid.
    yutopp authored Jan 1, 2023
    Copy the full SHA
    b05bb74 View commit details
Showing with 108 additions and 8 deletions.
  1. +5 −4 chunk_stream_writer.go
  2. +8 −4 chunk_streamer.go
  3. +95 −0 chunk_streamer_test.go
9 changes: 5 additions & 4 deletions chunk_stream_writer.go
Original file line number Diff line number Diff line change
@@ -15,10 +15,11 @@ import (
type ChunkStreamWriter struct {
ChunkStreamReader

doneCh chan struct{}
closeCh chan struct{}
lastErr error
aqM sync.Mutex
doneCh chan struct{}
closeCh chan struct{}
lastErr error
aqM sync.Mutex
newChunk bool
}

func (w *ChunkStreamWriter) Write(b []byte) (int, error) {
12 changes: 8 additions & 4 deletions chunk_streamer.go
Original file line number Diff line number Diff line change
@@ -168,6 +168,7 @@ func (cs *ChunkStreamer) NewChunkWriter(ctx context.Context, chunkStreamID int)
}

func (cs *ChunkStreamer) Sched(writer *ChunkStreamWriter) error {
writer.newChunk = true
return cs.writerSched.Sched(writer)
}

@@ -305,13 +306,14 @@ func (cs *ChunkStreamer) writeChunk(writer *ChunkStreamWriter) (bool, error) {

func (cs *ChunkStreamer) updateWriterHeader(writer *ChunkStreamWriter) {
fmt := byte(2) // default: only timestamp delta
if writer.messageHeader.messageLength != writer.messageLength || writer.messageTypeID != writer.messageHeader.messageTypeID {
if writer.messageHeader.messageLength != writer.messageLength ||
writer.messageTypeID != writer.messageHeader.messageTypeID {
// header or type id is updated, change fmt to 1 to notify difference and update state
writer.messageHeader.messageLength = writer.messageLength
writer.messageHeader.messageTypeID = writer.messageTypeID
fmt = 1
}
if writer.timestamp != writer.messageHeader.timestamp {
if writer.timestamp != writer.messageHeader.timestamp || writer.newChunk {
if writer.timestamp >= writer.messageHeader.timestamp {
writer.timestampDelta = writer.timestamp - writer.messageHeader.timestamp
} else {
@@ -320,6 +322,7 @@ func (cs *ChunkStreamer) updateWriterHeader(writer *ChunkStreamWriter) {
writer.timestampDelta = 0
}
}
writer.newChunk = false
if writer.timestampDelta == writer.messageHeader.timestampDelta && fmt == 2 {
fmt = 3
}
@@ -409,8 +412,9 @@ func (cs *ChunkStreamer) prepareChunkWriter(chunkStreamID int) (*ChunkStreamWrit
timestamp: math.MaxUint32, // initial state will be updated by writer.timestamp
},
},
doneCh: make(chan struct{}),
closeCh: make(chan struct{}),
doneCh: make(chan struct{}),
closeCh: make(chan struct{}),
newChunk: true,
}
close(writer.doneCh)
cs.writers[chunkStreamID] = writer
95 changes: 95 additions & 0 deletions chunk_streamer_test.go
Original file line number Diff line number Diff line change
@@ -249,6 +249,101 @@ func TestStreamerChunkExample1(t *testing.T) {
}
}

func TestStreamerChunkExample2(t *testing.T) {
type write struct {
timestamp uint32
length int
messageTypeId byte
}

type read struct {
timestamp uint32
delta uint32
fmt byte
isComplete bool
}

type testCase struct {
name string
chunkStreamID int
messageStreamID uint32
writeCases []write
readCases []read
}

tcs := []testCase{
// Same timestamp
{
name: "Same timestamp's delta #1",
chunkStreamID: 5,
messageStreamID: 22346,
writeCases: []write{
{timestamp: 1000, length: 200, messageTypeId: 10},
{timestamp: 1001, length: 200, messageTypeId: 11},
{timestamp: 2000, length: 200, messageTypeId: 10},
{timestamp: 2000, length: 200, messageTypeId: 11},
},
readCases: []read{
{timestamp: 1000, delta: 0, fmt: 0, isComplete: false},
{timestamp: 1000, delta: 0, fmt: 3, isComplete: true},

{timestamp: 1000, delta: 1, fmt: 1, isComplete: false},
{timestamp: 1001, delta: 0, fmt: 3, isComplete: true},

{timestamp: 1001, delta: 999, fmt: 1, isComplete: false},
{timestamp: 2000, delta: 0, fmt: 3, isComplete: true},

{timestamp: 2000, delta: 0, fmt: 1, isComplete: false},
{timestamp: 2000, delta: 0, fmt: 3, isComplete: true},
},
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
buf := bytes.NewBuffer(make([]byte, 0, 2048))
inbuf := bufio.NewReaderSize(buf, 2048)
outbuf := bufio.NewWriterSize(buf, 2048)

streamer := NewChunkStreamer(inbuf, outbuf, nil)

for i, wc := range tc.writeCases {
t.Run(fmt.Sprintf("Write: %d", i), func(t *testing.T) {
w, err := streamer.NewChunkWriter(context.Background(), tc.chunkStreamID)
assert.Nil(t, err)
assert.NotNil(t, w)

bin := make([]byte, wc.length)

w.messageLength = uint32(len(bin))
w.messageTypeID = byte(wc.messageTypeId)
w.messageStreamID = tc.messageStreamID
w.timestamp = wc.timestamp
w.buf.Write(bin)
err = streamer.Sched(w)
assert.Nil(t, err)
})
}

_, err := streamer.NewChunkWriter(context.Background(), tc.chunkStreamID) // wait for writing
assert.Nil(t, err)

for i, rc := range tc.readCases {
t.Run(fmt.Sprintf("Read: %d", i), func(t *testing.T) {
r, err := streamer.readChunk()
_ = rc
_ = err
assert.Nil(t, err)
assert.NotNil(t, r)
assert.Equal(t, rc.fmt, r.basicHeader.fmt)
assert.Equal(t, uint32(rc.delta), r.messageHeader.timestampDelta)
assert.Equal(t, rc.isComplete, r.completed)
})
}
})
}
}

func TestWriteToInvalidWriter(t *testing.T) {
buf := bytes.NewBuffer(make([]byte, 0, 2048))
inbuf := bufio.NewReaderSize(buf, 2048)