Skip to content

Commit

Permalink
Only track video streams
Browse files Browse the repository at this point in the history
The AbstractVideoIngestionAppliance keeps track of the position associated
with the stream; however, in MPEG-TS there is more than one stream (e.g.
audio and video) which means there is actually more than one stream
position.

This was causing issues where position was jumping around back and forth
because all positions were treated as being from a single stream.  We
now only care about the video stream.

It may be that an MPEG-TS stream can contain multiple video streams,
which will break this code; that said, TV Kitchen is fairly built around
an assumption that a given video stream has a single video in it.

In the long run we may want to also track audio stream position, but
honestly at that stage we would probably want the ingestor to output
multiple demuxed and remuxed streams instead of simply forwarding the
stream.

Issue #132
  • Loading branch information
slifty committed Jul 2, 2021
1 parent 5fe7c5f commit 2d2fc33
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 25 deletions.
3 changes: 3 additions & 0 deletions packages/core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Changed
- The `AbstractVideoIngestionAppliance` now only internally tracks the position of the video stream in a given MPEG-TS stream, as opposed to tracking all streams.

### Fixed
- The optional `baseTime` parameter is now in terms of the fraction instead of the denominator of the fraction (e.g. `1/90000` instead of `90000`).
- `AbstractVideoIngestionAppliance` now supports streams that have been running long enough for a PTS rollover to occur.
Expand Down
22 changes: 14 additions & 8 deletions packages/core/src/AbstractVideoIngestionAppliance.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import {
getRolloverTimestamp,
} from './utils/mpegts'

// This is defined in the demuxer we're using, but it is
// not exported so we have to re-define it here.
const MPEGTS_CONTENT_TYPE_VIDEO = 2

/**
* An AbstractVideoIngestionAppliance provides generic functionality for converting an arbitrary
* video input stream into VIDEO.CONTAINER payloads.
Expand All @@ -39,7 +43,7 @@ class AbstractVideoIngestionAppliance extends AbstractAppliance {
mpegTsDemuxer = null

// A shim variable that allows us to use the output of TSDemuxer in our engine
mostRecentDemuxedPacket = null
mostRecentDemuxedVideoPacket = null

// A Transformation stream that will convert MPEG-TS data into TV Kitchen Payloads
mpegtsProcessingStream = null
Expand Down Expand Up @@ -123,15 +127,17 @@ class AbstractVideoIngestionAppliance extends AbstractAppliance {
}

/**
* Updates ingestion state based on the latest demuxed packet data.
* Updates ingestion state based on the latest demuxed video packet data.
*
* This method is called by our MPEG-TS demuxer, and allows the ingestion engine to track
* the most recent demuxed packet.
* the most recent demuxed video packet.
*
* @param {Packet} packet The latest TSDemuxer Packet object
*/
onDemuxedPacket(packet) {
this.mostRecentDemuxedPacket = packet
if (packet.content_type === MPEGTS_CONTENT_TYPE_VIDEO) {
this.mostRecentDemuxedVideoPacket = packet
}
}

/**
Expand All @@ -141,8 +147,8 @@ class AbstractVideoIngestionAppliance extends AbstractAppliance {
*
* @return {Packet} The most recent Packet object extracted by TSDemuxer
*/
getMostRecentDemuxedPacket() {
return this.mostRecentDemuxedPacket
getMostRecentDemuxedVideoPacket() {
return this.mostRecentDemuxedVideoPacket
}

/**
Expand All @@ -159,8 +165,8 @@ class AbstractVideoIngestionAppliance extends AbstractAppliance {
*/
processMpegtsStreamData(mpegtsData, enc, done) {
this.mpegTsDemuxer.write(mpegtsData)
const demuxedPacket = this.getMostRecentDemuxedPacket() || generateEmptyPacket()
const newPosition = tsToMilliseconds(demuxedPacket.pts)
const demuxedVideoPacket = this.getMostRecentDemuxedVideoPacket() || generateEmptyPacket()
const newPosition = tsToMilliseconds(demuxedVideoPacket.pts)

if (areDiscontinuousPositions(this.latestPosition, newPosition)) {
this.rolloverCount += 1
Expand Down
38 changes: 21 additions & 17 deletions packages/core/src/__test__/AbstractVideoIngestionAppliance.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ describe('AbstractVideoIngestionAppliance #unit', () => {
ingestionAppliance.mpegtsDemuxer = {
process: jest.fn(),
}
ingestionAppliance.getMostRecentDemuxedPacket = jest.fn().mockReturnValueOnce({
pts: 90000,
})
const testData = loadTestData(__dirname, 'processMpegtsStreamData.json')
const videoPacket = testData[0]
ingestionAppliance.onDemuxedPacket(videoPacket)
const streamData = Buffer.from('testDataXYZ', 'utf8')
ingestionAppliance.processMpegtsStreamData(streamData, null, (err, result) => {
expect(result.position).toEqual(1000)
expect(result.position).toEqual(13946)
})
})
it('should correctly decorate the Payload createdAt', () => {
Expand All @@ -100,9 +100,9 @@ describe('AbstractVideoIngestionAppliance #unit', () => {
ingestionAppliance.mpegtsDemuxer = {
process: jest.fn(),
}
ingestionAppliance.getMostRecentDemuxedPacket = jest.fn().mockReturnValueOnce({
pts: 90000,
})
const testData = loadTestData(__dirname, 'processMpegtsStreamData.json')
const videoPacket = testData[0]
ingestionAppliance.onDemuxedPacket(videoPacket)
const streamData = Buffer.from('testDataXYZ', 'utf8')
ingestionAppliance.processMpegtsStreamData(streamData, null, (err, result) => {
expect(typeof result.createdAt).toBe('string')
Expand Down Expand Up @@ -130,26 +130,30 @@ describe('AbstractVideoIngestionAppliance #unit', () => {
describe('onDemuxedPacket', () => {
it('should register new packets as most recent', () => {
const testData = loadTestData(__dirname, 'onDemuxedPacket.json')
const demuxedPacket = testData[0]
const demuxedPacket2 = testData[1]
const videoPacket1 = testData[0]
const videoPacket2 = testData[1]
const audioPacket1 = testData[1]
const ingestionAppliance = new FullyImplementedVideoIngestionAppliance()
ingestionAppliance.onDemuxedPacket(demuxedPacket)
ingestionAppliance.onDemuxedPacket(demuxedPacket2)
expect(ingestionAppliance.mostRecentDemuxedPacket).toEqual(demuxedPacket2)
ingestionAppliance.onDemuxedPacket(videoPacket1)
expect(ingestionAppliance.mostRecentDemuxedVideoPacket).toEqual(videoPacket1)
ingestionAppliance.onDemuxedPacket(videoPacket2)
expect(ingestionAppliance.mostRecentDemuxedVideoPacket).toEqual(videoPacket2)
ingestionAppliance.onDemuxedPacket(audioPacket1)
expect(ingestionAppliance.mostRecentDemuxedVideoPacket).toEqual(videoPacket2)
})
})

describe('getMostRecentDemuxedPacket', () => {
describe('getMostRecentDemuxedVideoPacket', () => {
it('should return the value in most recent demuxed packet', () => {
const testData = loadTestData(__dirname, 'getMostRecentDemuxedPacket.json')
const demuxedPacket = testData[0]
const videoPacket = testData[0]
const ingestionAppliance = new FullyImplementedVideoIngestionAppliance()
ingestionAppliance.mostRecentDemuxedPacket = demuxedPacket
expect(ingestionAppliance.getMostRecentDemuxedPacket()).toEqual(demuxedPacket)
ingestionAppliance.onDemuxedPacket(videoPacket)
expect(ingestionAppliance.getMostRecentDemuxedVideoPacket()).toEqual(videoPacket)
})
it('should return null if nothing has been processed', () => {
const ingestionAppliance = new FullyImplementedVideoIngestionAppliance()
expect(ingestionAppliance.getMostRecentDemuxedPacket()).toBe(null)
expect(ingestionAppliance.getMostRecentDemuxedVideoPacket()).toBe(null)
})
})

Expand Down
16 changes: 16 additions & 0 deletions packages/core/src/__test__/data/processMpegtsStreamData.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[
{
"data": [
0, 0, 1, 0, 0, 215, 255, 251, 128, 0, 0, 1
],
"pts": 1255128,
"dts": 1252125,
"frame_ticks": 3003,
"program": 1,
"stream_number": 1,
"stream_id": 224,
"type": 1,
"content_type": 2,
"frame_num": 377
}
]

0 comments on commit 2d2fc33

Please sign in to comment.