Skip to content

Commit

Permalink
Handle PTS rollovers
Browse files Browse the repository at this point in the history
MPEG-TS stores packet positions using 33 bits, which means that for long
running streams the position will rollover approximately every 26.5 hours.

TV Kitchen appliances need to be able to ingest videos and streams of
any length, which means when rollovers occur the appliance needs to
properly update the origin times to account for the rollover.

Issue #130
  • Loading branch information
slifty committed Jun 30, 2021
1 parent 5473c30 commit 51c19ed
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 3 deletions.
1 change: 1 addition & 0 deletions packages/core/CHANGELOG.md
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Fixed
- The optional `baseTime` parameter is now in terms of the fraction instead of the denominator of the fraction (e.g. `1/90000` instead of `90000`).
- `AbstractVideoIngestionAppliance` now supports streams that have been running long enough for a PTS rollover to occur.

## [0.7.0] - 2020-05-13
### Added
Expand Down
29 changes: 26 additions & 3 deletions packages/core/src/AbstractVideoIngestionAppliance.js
Expand Up @@ -16,6 +16,8 @@ import { AbstractAppliance } from './AbstractAppliance'
import {
tsToMilliseconds,
generateEmptyPacket,
areDiscontinuousPositions,
getRolloverTimestamp,
} from './utils/mpegts'

/**
Expand Down Expand Up @@ -45,6 +47,16 @@ class AbstractVideoIngestionAppliance extends AbstractAppliance {
// A Writeable stream that will ingest Payloads into the TV Kitchen pipeline.
payloadIngestionStream = null

// The number of times the TS stream has rolled over it's timestamps
// This happens once every 2^33 / 90000 MS
rolloverCount = 0

// A tracker for the most recent position emitted by the MPEG-TS demuxer
latestPosition = 0

// A tracker for the most recent MPEG-TS origin point, accounting for rollovers
rolloverOrigin = ''

/**
* Create a AbstractVideoIngestionAppliance.
*
Expand All @@ -60,6 +72,7 @@ class AbstractVideoIngestionAppliance extends AbstractAppliance {
throw new AbstractInstantiationError(this.constructor.name)
}

this.rolloverOrigin = this.settings.origin
this.mpegTsDemuxer = new MpegTsDemuxer()
this.mpegTsDemuxer.on(
'data',
Expand Down Expand Up @@ -147,14 +160,24 @@ class AbstractVideoIngestionAppliance extends AbstractAppliance {
processMpegtsStreamData(mpegtsData, enc, done) {
this.mpegTsDemuxer.write(mpegtsData)
const demuxedPacket = this.getMostRecentDemuxedPacket() || generateEmptyPacket()
const position = tsToMilliseconds(demuxedPacket.pts)
const newPosition = tsToMilliseconds(demuxedPacket.pts)

if (areDiscontinuousPositions(this.latestPosition, newPosition)) {
this.rolloverCount += 1
this.rolloverOrigin = getRolloverTimestamp(
this.settings.origin,
this.rolloverCount,
)
}
this.latestPosition = newPosition

const payload = new Payload({
data: mpegtsData,
type: dataTypes.STREAM.CONTAINER,
duration: 0,
position,
position: this.latestPosition,
createdAt: (new Date()).toISOString(),
origin: this.settings.origin,
origin: this.rolloverOrigin,
})
done(null, payload)
}
Expand Down
49 changes: 49 additions & 0 deletions packages/core/src/utils/__test__/mpegts.test.js
@@ -1,6 +1,8 @@
import {
tsToMilliseconds,
generateEmptyPacket,
areDiscontinuousPositions,
getRolloverTimestamp,
} from '../mpegts'

describe('mpegts utilities', () => {
Expand Down Expand Up @@ -30,4 +32,51 @@ describe('mpegts utilities', () => {
})
})
})

describe('areDiscontinuousPositions', () => {
it('should return false if positions are monotonically increasing', () => {
expect(areDiscontinuousPositions(0, 1)).toBe(false)
expect(areDiscontinuousPositions(0, 0)).toBe(false)
expect(areDiscontinuousPositions(1000, 15000)).toBe(false)
})

it('should return true if positions are decreasing', () => {
expect(areDiscontinuousPositions(10, 0)).toBe(true)
expect(areDiscontinuousPositions(0, -1)).toBe(true)
expect(areDiscontinuousPositions(10000, 5000)).toBe(true)
})
})

describe('getRolloverTimestamp', () => {
it('should not modify the timestamp if there are no rollovers', () => {
const baseTimestamp = '2021-06-30T18:30:25.205Z'
expect(getRolloverTimestamp(baseTimestamp, 0))
.toEqual(baseTimestamp)
})
it('should increase the timestamp by the default PTS rollover if there is a rollover', () => {
const baseTimestamp = '2021-06-30T18:30:25.205Z'
const singleRolloverTimestamp = '2021-07-01T21:01:08.922Z' // add 2^33 / 90000 seconds to the timestamp
expect(getRolloverTimestamp(baseTimestamp, 1))
.toEqual(singleRolloverTimestamp)
})
it('should default the rolloverCount to 1', () => {
const baseTimestamp = '2021-06-30T18:30:25.205Z'
const singleRolloverTimestamp = '2021-07-01T21:01:08.922Z' // add 2^33 / 90000 seconds to the timestamp
expect(getRolloverTimestamp(baseTimestamp))
.toEqual(singleRolloverTimestamp)
})
it('should increase the timestamp by the default PTS rollover multiple times if there is more than one rollover', () => {
const baseTimestamp = '2021-06-30T18:30:25.205Z'
const doubleRolloverTimestamp = '2021-07-02T23:31:52.640Z' // add 2 * 2^33 / 90000 seconds to the timestamp
expect(getRolloverTimestamp(baseTimestamp, 2))
.toEqual(doubleRolloverTimestamp)
})
it('should increase the timestamp based on the baseTime', () => {
const baseTimestamp = '2021-06-30T18:30:25.205Z'
const baseTime = 1 / 50000
const singleRolloverTimestamp = '2021-07-02T18:13:43.896Z' // add 2^33 / 50000 seconds to the timestamp
expect(getRolloverTimestamp(baseTimestamp, 1, baseTime))
.toEqual(singleRolloverTimestamp)
})
})
})
29 changes: 29 additions & 0 deletions packages/core/src/utils/mpegts.js
@@ -1,3 +1,6 @@
// MPEG-TS stores PTS in 33 bits, so positions rollover back to zero after a time.
const MAX_PTS = 2 ** 33

// MPEG-TS positions are stored as 1/90000'th of a second
const DEFAULT_BASETIME = 1 / 90000

Expand Down Expand Up @@ -30,3 +33,29 @@ export const generateEmptyPacket = () => ({
content_type: 0,
frame_num: 0,
})

/**
* Determines if a pair of timestamps are discontinuous.
*
* Two timestamps are discontinuous if the new timestamp is before the
* old timestamp (if the pair is not monotonically increasing).
*
* @return {Boolean}
*/
export function areDiscontinuousPositions(firstPosition, secondPosition) {
return firstPosition > secondPosition
}

/**
* Calculates a timestamp that is offset by a number of PTS rollovers.
*
* @param {string} timestamp The ISO string of the timestamp being offset.
* @param {number} rolloverCount The number of PTS rollovers to add to the timestamp.
* @return {string} The ISO string of the resulting timestamp.
*/
export function getRolloverTimestamp(timestamp, rolloverCount = 1, baseTime = DEFAULT_BASETIME) {
const baseTimeMs = baseTime * 1000
const rolloverMs = rolloverCount * MAX_PTS * baseTimeMs
const updatedMs = (new Date(timestamp)).getTime() + rolloverMs
return (new Date(updatedMs)).toISOString()
}

0 comments on commit 51c19ed

Please sign in to comment.