Skip to content

Commit

Permalink
fix(offline): Speed up offline storage by ~87% (#4176)
Browse files Browse the repository at this point in the history
By waiting for all segment data to be written to the database before updating the manifest, we can speed up offline storage in the foreground by ~87%.

Closes #4166
  • Loading branch information
joeyparrish committed May 17, 2022
1 parent edc85b9 commit dc27a26
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 108 deletions.
1 change: 0 additions & 1 deletion build/types/core
Expand Up @@ -84,7 +84,6 @@
+../../lib/util/mp4_box_parsers.js
+../../lib/util/mp4_parser.js
+../../lib/util/multi_map.js
+../../lib/util/mutex.js
+../../lib/util/networking.js
+../../lib/util/object_utils.js
+../../lib/util/operation_manager.js
Expand Down
113 changes: 58 additions & 55 deletions lib/offline/storage.js
Expand Up @@ -29,7 +29,6 @@ goog.require('shaka.util.Functional');
goog.require('shaka.util.IDestroyable');
goog.require('shaka.util.Iterables');
goog.require('shaka.util.MimeUtils');
goog.require('shaka.util.Mutex');
goog.require('shaka.util.Platform');
goog.require('shaka.util.PlayerConfiguration');
goog.require('shaka.util.StreamUtils');
Expand Down Expand Up @@ -210,7 +209,7 @@ shaka.offline.Storage = class {
}

goog.asserts.assert(
this.config_, 'Cannot reconfigure stroage after calling destroy.');
this.config_, 'Cannot reconfigure storage after calling destroy.');
return shaka.util.PlayerConfiguration.mergeConfigObjects(
/* destination= */ this.config_, /* updates= */ config );
}
Expand Down Expand Up @@ -479,45 +478,43 @@ shaka.offline.Storage = class {
async downloadSegments_(
toDownload, manifestId, manifestDB, downloader, config, storage,
manifest, drmEngine) {
let pendingManifestUpdates = {};
let pendingDataSize = 0;

/**
* @param {!Array.<!shaka.offline.DownloadInfo>} toDownload
* @param {boolean} updateDRM
*/
const download = async (toDownload, updateDRM) => {
const throwIfAbortedFn = () => {
this.ensureNotDestroyed_();
};
for (const download of toDownload) {
/** @param {?BufferSource} data */
let data;
const request = download.makeSegmentRequest(config);
const estimateId = download.estimateId;
const isInitSegment = download.isInitSegment;
const onDownloaded = (d) => {
data = d;
return Promise.resolve();
};
downloader.queue(download.groupId,
request, estimateId, isInitSegment, onDownloaded);
downloader.queueWork(download.groupId, async () => {
goog.asserts.assert(data, 'We should have loaded data by now');
goog.asserts.assert(data instanceof ArrayBuffer,
'The data should be an ArrayBuffer');

const onDownloaded = async (data) => {
// Store the data.
const dataKeys = await storage.addSegments([{data}]);
this.ensureNotDestroyed_();

// Store the necessary update to the manifest, to be processed later.
const ref = /** @type {!shaka.media.SegmentReference} */ (
download.ref);
manifestDB = (await shaka.offline.Storage.assignStreamToManifest(
manifestId, ref, {data}, throwIfAbortedFn)) || manifestDB;
});
const id = shaka.offline.DownloadInfo.idForSegmentRef(ref);
pendingManifestUpdates[id] = dataKeys[0];
pendingDataSize += data.byteLength;
};

downloader.queue(download.groupId,
request, estimateId, isInitSegment, onDownloaded);
}
await downloader.waitToFinish();

if (updateDRM) {
// Re-store the manifest, to attach session IDs.
// These were (maybe) discovered inside the downloader; we can only add
// them now, at the end, since the manifestDB is in flux during the
// process of downloading and storing, and assignStreamToManifest does
// not know about the DRM engine.
// process of downloading and storing, and assignSegmentsToManifest
// does not know about the DRM engine.
this.ensureNotDestroyed_();
this.setManifestDrmFields_(manifest, manifestDB, drmEngine, config);
await storage.updateManifest(manifestId, manifestDB);
Expand All @@ -534,12 +531,30 @@ shaka.offline.Storage = class {
await download(toDownload.filter((info) => info.isInitSegment), true);
this.ensureNotDestroyed_();
toDownload = toDownload.filter((info) => !info.isInitSegment);

// Copy these and reset them now, before calling await.
const manifestUpdates = pendingManifestUpdates;
const dataSize = pendingDataSize;
pendingManifestUpdates = {};
pendingDataSize = 0;

manifestDB =
(await shaka.offline.Storage.assignSegmentsToManifest(
manifestId, manifestUpdates, dataSize,
() => this.ensureNotDestroyed_())) || manifestDB;
this.ensureNotDestroyed_();
}

if (!usingBgFetch) {
await download(toDownload, false);
this.ensureNotDestroyed_();

manifestDB =
(await shaka.offline.Storage.assignSegmentsToManifest(
manifestId, pendingManifestUpdates, pendingDataSize,
() => this.ensureNotDestroyed_())) || manifestDB;
this.ensureNotDestroyed_();

goog.asserts.assert(
!manifestDB.isIncomplete, 'The manifest should be complete by now');
} else {
Expand Down Expand Up @@ -567,48 +582,33 @@ shaka.offline.Storage = class {
}

/**
* Load the given manifest, modifies it by assigning the given data to the
* segments corresponding to "ref", then re-stores the manifest.
* The parts of this function that modify the manifest are protected by a
* mutex, to prevent race conditions; specifically, it prevents two parallel
* instances of this method from both loading the manifest into memory at the
* same time, which would result in the slower/later call overwriting the
* changes of the other.
* Load the given manifest, assigns database key to all the segments, then
* stores the updated manifest.
*
* It is up to the caller to ensure that this method is not called
* concurrently on the same manifest.
*
* @param {number} manifestId
* @param {!shaka.media.SegmentReference} ref
* @param {shaka.extern.SegmentDataDB} data
* @param {!Object.<string, number>} manifestUpdates
* @param {number} dataSizeUpdate
* @param {function()} throwIfAbortedFn A function that should throw if the
* download has been aborted.
* @return {!Promise.<?shaka.extern.ManifestDB>}
*/
static async assignStreamToManifest(manifestId, ref, data, throwIfAbortedFn) {
static async assignSegmentsToManifest(
manifestId, manifestUpdates, dataSizeUpdate, throwIfAbortedFn) {
/** @type {shaka.offline.StorageMuxer} */
const muxer = new shaka.offline.StorageMuxer();

const idForRef = shaka.offline.DownloadInfo.idForSegmentRef(ref);
let manifestUpdated = false;
let dataKey;
let activeHandle;
/** @type {!shaka.extern.ManifestDB} */
let manifestDB;

let mutexId = 0;

try {
await muxer.init();
activeHandle = await muxer.getActive();

// Store the data.
const dataKeys = await activeHandle.cell.addSegments([data]);
dataKey = dataKeys[0];
throwIfAbortedFn();

// Acquire the mutex before accessing the manifest, since there could be
// multiple instances of this method running at once.
mutexId = await shaka.offline.Storage.mutex_.acquire();
throwIfAbortedFn();

// Load the manifest.
const manifests = await activeHandle.cell.getManifests([manifestId]);
throwIfAbortedFn();
Expand All @@ -618,19 +618,25 @@ shaka.offline.Storage = class {
let complete = true;
for (const stream of manifestDB.streams) {
for (const segment of stream.segments) {
if (segment.pendingSegmentRefId == idForRef) {
let dataKey = segment.pendingSegmentRefId ?
manifestUpdates[segment.pendingSegmentRefId] : null;
if (dataKey != null) {
segment.dataKey = dataKey;
// Now that the segment has been associated with the appropriate
// dataKey, the pendingSegmentRefId is no longer necessary.
segment.pendingSegmentRefId = undefined;
}
if (segment.pendingInitSegmentRefId == idForRef) {

dataKey = segment.pendingInitSegmentRefId ?
manifestUpdates[segment.pendingInitSegmentRefId] : null;
if (dataKey != null) {
segment.initSegmentKey = dataKey;
// Now that the init segment has been associated with the
// appropriate initSegmentKey, the pendingInitSegmentRefId is no
// longer necessary.
segment.pendingInitSegmentRefId = undefined;
}

if (segment.pendingSegmentRefId) {
complete = false;
}
Expand All @@ -641,7 +647,7 @@ shaka.offline.Storage = class {
}

// Update the size of the manifest.
manifestDB.size += data.data.byteLength;
manifestDB.size += dataSizeUpdate;

// Mark the manifest as complete, if all segments are downloaded.
if (complete) {
Expand All @@ -655,17 +661,17 @@ shaka.offline.Storage = class {
} catch (e) {
await shaka.offline.Storage.cleanStoredManifest(manifestId);

if (activeHandle && !manifestUpdated && dataKey) {
if (activeHandle && !manifestUpdated) {
const dataKeys = Object.values(manifestUpdates);
// The cleanStoredManifest method will not "see" any segments that have
// been downloaded but not assigned to the manifest yet. So un-store
// them separately.
await activeHandle.cell.removeSegments([dataKey], (key) => {});
await activeHandle.cell.removeSegments(dataKeys, (key) => {});
}

throw e;
} finally {
await muxer.destroy();
shaka.offline.Storage.mutex_.release(mutexId);
}
return manifestDB;
}
Expand Down Expand Up @@ -1662,9 +1668,6 @@ shaka.offline.Storage = class {
}
};

/** @private {!shaka.util.Mutex} */
shaka.offline.Storage.mutex_ = new shaka.util.Mutex();

shaka.offline.Storage.defaultSystemIds_ = new Map()
.set('org.w3.clearkey', '1077efecc0b24d02ace33c1e52e2fb4b')
.set('com.widevine.alpha', 'edef8ba979d64acea3c827dcd51d21ed')
Expand Down
52 changes: 0 additions & 52 deletions lib/util/mutex.js

This file was deleted.

0 comments on commit dc27a26

Please sign in to comment.