Skip to content

Commit

Permalink
Handle PTS rollovers
Browse files Browse the repository at this point in the history
MPEG-TS stores packet positions using 33 bits, which means that for long
running streams the position will rollover approximately every 26.5 hours.

TV Kitchen appliances need to be able to ingest videos and streams of
any length, which means when rollovers occur the appliance needs to
properly update the origin times to account for the rollover.

Issue #130
  • Loading branch information
slifty committed Jun 30, 2021
1 parent 195005c commit af28c0c
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 6 deletions.
29 changes: 26 additions & 3 deletions packages/core/src/AbstractVideoIngestionAppliance.js
Expand Up @@ -16,6 +16,8 @@ import { AbstractAppliance } from './AbstractAppliance'
import {
tsToMilliseconds,
generateEmptyPacket,
areDiscontinuousPositions,
getRolloverTimestamp,
} from './utils/mpegts'

/**
Expand Down Expand Up @@ -45,6 +47,16 @@ class AbstractVideoIngestionAppliance extends AbstractAppliance {
// A Writeable stream that will ingest Payloads into the TV Kitchen pipeline.
payloadIngestionStream = null

// The number of times the TS stream has rolled over it's timestamps
// This happens once every 2^33 / 90000 MS
rolloverCount = 0

// A tracker for the most recent position emitted by the MPEG-TS demuxer
latestPosition = 0

// A tracker for the most recent MPEG-TS origin point, accounting for rollovers
rolloverOrigin = ''

/**
* Create a AbstractVideoIngestionAppliance.
*
Expand All @@ -60,6 +72,7 @@ class AbstractVideoIngestionAppliance extends AbstractAppliance {
throw new AbstractInstantiationError(this.constructor.name)
}

this.rolloverOrigin = this.settings.origin
this.mpegTsDemuxer = new MpegTsDemuxer()
this.mpegTsDemuxer.on(
'data',
Expand Down Expand Up @@ -147,14 +160,24 @@ class AbstractVideoIngestionAppliance extends AbstractAppliance {
processMpegtsStreamData(mpegtsData, enc, done) {
this.mpegTsDemuxer.write(mpegtsData)
const demuxedPacket = this.getMostRecentDemuxedPacket() || generateEmptyPacket()
const position = tsToMilliseconds(demuxedPacket.pts)
const newPosition = tsToMilliseconds(demuxedPacket.pts)

if (areDiscontinuousPositions(this.latestPosition, newPosition)) {
this.rolloverCount += 1
this.rolloverOrigin = getRolloverTimestamp(
this.settings.origin,
this.rolloverCount,
)
}
this.latestPosition = newPosition

const payload = new Payload({
data: mpegtsData,
type: dataTypes.STREAM.CONTAINER,
duration: 0,
position,
position: this.latestPosition,
createdAt: (new Date()).toISOString(),
origin: this.settings.origin,
origin: this.rolloverOrigin,
})
done(null, payload)
}
Expand Down
Expand Up @@ -117,7 +117,7 @@ describe('AbstractVideoIngestionAppliance #unit', () => {
ingestionAppliance.mpegtsDemuxer = {
process: jest.fn(),
}
ingestionAppliance.getMostRecentDemuxedPacket = jest.fn().mockReturnValueOnce({
ingestionAppliance.getMostRecentDemuxedPacket = jest.fn().mockReturnValue({
pts: 90000,
})
const streamData = Buffer.from('testDataXYZ', 'utf8')
Expand Down
51 changes: 50 additions & 1 deletion packages/core/src/utils/__test__/mpegts.test.js
@@ -1,6 +1,8 @@
import {
tsToMilliseconds,
generateEmptyPacket,
areDiscontinuousPositions,
getRolloverTimestamp,
} from '../mpegts'

describe('mpegts utilities', () => {
Expand All @@ -9,7 +11,7 @@ describe('mpegts utilities', () => {
expect(tsToMilliseconds(90000)).toBe(1000)
})
it('should convert successfuly with an override basetime', () => {
expect(tsToMilliseconds(500, 200)).toBe(2500)
expect(tsToMilliseconds(500, 1 / 200)).toBe(2500)
})
})

Expand All @@ -30,4 +32,51 @@ describe('mpegts utilities', () => {
})
})
})

describe('areDiscontinuousPositions', () => {
it('should return false if positions are monotonically increasing', () => {
expect(areDiscontinuousPositions(0, 1)).toBe(false)
expect(areDiscontinuousPositions(0, 0)).toBe(false)
expect(areDiscontinuousPositions(1000, 15000)).toBe(false)
})

it('should return true if positions are decreasing', () => {
expect(areDiscontinuousPositions(10, 0)).toBe(true)
expect(areDiscontinuousPositions(0, -1)).toBe(true)
expect(areDiscontinuousPositions(10000, 5000)).toBe(true)
})
})

describe('getRolloverTimestamp', () => {
it('should not modify the timestamp if there are no rollovers', () => {
const baseTimestamp = '2021-06-30T18:30:25.205Z'
expect(getRolloverTimestamp(baseTimestamp, 0))
.toEqual(baseTimestamp)
})
it('should increase the timestamp by the default PTS rollover if there is a rollover', () => {
const baseTimestamp = '2021-06-30T18:30:25.205Z'
const singleRolloverTimestamp = '2021-07-01T21:01:08.922Z' // add 2^33 / 90000 seconds to the timestamp
expect(getRolloverTimestamp(baseTimestamp, 1))
.toEqual(singleRolloverTimestamp)
})
it('should default the rolloverCount to 1', () => {
const baseTimestamp = '2021-06-30T18:30:25.205Z'
const singleRolloverTimestamp = '2021-07-01T21:01:08.922Z' // add 2^33 / 90000 seconds to the timestamp
expect(getRolloverTimestamp(baseTimestamp))
.toEqual(singleRolloverTimestamp)
})
it('should increase the timestamp by the default PTS rollover multiple times if there is more than one rollover', () => {
const baseTimestamp = '2021-06-30T18:30:25.205Z'
const doubleRolloverTimestamp = '2021-07-02T23:31:52.640Z' // add 2 * 2^33 / 90000 seconds to the timestamp
expect(getRolloverTimestamp(baseTimestamp, 2))
.toEqual(doubleRolloverTimestamp)
})
it('should increase the timestamp based on the baseTime', () => {
const baseTimestamp = '2021-06-30T18:30:25.205Z'
const baseTime = 1 / 50000
const singleRolloverTimestamp = '2021-07-02T18:13:43.896Z' // add 2^33 / 50000 seconds to the timestamp
expect(getRolloverTimestamp(baseTimestamp, 1, baseTime))
.toEqual(singleRolloverTimestamp)
})
})
})
37 changes: 36 additions & 1 deletion packages/core/src/utils/mpegts.js
@@ -1,11 +1,20 @@
// MPEG-TS stores PTS in 33 bits, so positions rollover back to zero after a time.
const MAX_PTS = 2 ** 33

// MPEG-TS positions are stored as 1/90000'th of a second
const DEFAULT_BASETIME = 1 / 90000

/**
* Converts a pts or dts extracted from an MPEG-TS packet to seconds.
*
* @param {Number} ts The pts or dts value to be converted.
* @param {Number} baseTime The MPEG-TS basetime associated with the ts.
* @return {Number} The number of seconds represented by the pts or dts.
*/
export const tsToMilliseconds = (ts, baseTime = 90000) => +((ts / baseTime) * 1000).toFixed(0)
export function tsToMilliseconds(ts, baseTime = DEFAULT_BASETIME) {
const baseTimeMs = baseTime * 1000
return +(ts * baseTimeMs).toFixed(0)
}

/**
* Exports a TSDemuxer MPEG-TS Packet as defined in the TSDemuxer project.
Expand All @@ -24,3 +33,29 @@ export const generateEmptyPacket = () => ({
content_type: 0,
frame_num: 0,
})

/**
* Determines if a pair of timestamps are discontinuous.
*
* Two timestamps are discontinuous if the new timestamp is before the
* old timestamp (if the pair is not monotonically increasing).
*
* @return {Boolean}
*/
export function areDiscontinuousPositions(firstPosition, secondPosition) {
return firstPosition > secondPosition
}

/**
* Calculates a timestamp that is offset by a number of PTS rollovers.
*
* @param {string} timestamp The ISO string of the timestamp being offset.
* @param {number} rolloverCount The number of PTS rollovers to add to the timestamp.
* @return {string} The ISO string of the resulting timestamp.
*/
export function getRolloverTimestamp(timestamp, rolloverCount = 1, baseTime = DEFAULT_BASETIME) {
const baseTimeMs = baseTime * 1000
const rolloverMs = rolloverCount * MAX_PTS * baseTimeMs
const updatedMs = (new Date(timestamp)).getTime() + rolloverMs
return (new Date(updatedMs)).toISOString()
}

0 comments on commit af28c0c

Please sign in to comment.