From c1c96135120480afc9615713812eecc4a51f153b Mon Sep 17 00:00:00 2001 From: Joey Parrish Date: Fri, 29 Apr 2022 16:34:23 -0700 Subject: [PATCH] fix(offline): Speed up offline storage by ~87% (#4176) By waiting for all segment data to be written to the database before updating the manifest, we can speed up offline storage in the foreground by ~87%. Closes #4166 --- build/types/core | 1 - lib/offline/storage.js | 113 +++++++++++++++++++++-------------------- lib/util/mutex.js | 52 ------------------- 3 files changed, 58 insertions(+), 108 deletions(-) delete mode 100644 lib/util/mutex.js diff --git a/build/types/core b/build/types/core index 94d18a7991..27caffd2ea 100644 --- a/build/types/core +++ b/build/types/core @@ -84,7 +84,6 @@ +../../lib/util/mp4_box_parsers.js +../../lib/util/mp4_parser.js +../../lib/util/multi_map.js -+../../lib/util/mutex.js +../../lib/util/networking.js +../../lib/util/object_utils.js +../../lib/util/operation_manager.js diff --git a/lib/offline/storage.js b/lib/offline/storage.js index 1b6c0f7690..744b2fb493 100644 --- a/lib/offline/storage.js +++ b/lib/offline/storage.js @@ -27,7 +27,6 @@ goog.require('shaka.util.Error'); goog.require('shaka.util.IDestroyable'); goog.require('shaka.util.Iterables'); goog.require('shaka.util.MimeUtils'); -goog.require('shaka.util.Mutex'); goog.require('shaka.util.Platform'); goog.require('shaka.util.PlayerConfiguration'); goog.require('shaka.util.StreamUtils'); @@ -196,7 +195,7 @@ shaka.offline.Storage = class { goog.asserts.assert(typeof(config) == 'object', 'Should be an object!'); goog.asserts.assert( - this.config_, 'Cannot reconfigure stroage after calling destroy.'); + this.config_, 'Cannot reconfigure storage after calling destroy.'); return shaka.util.PlayerConfiguration.mergeConfigObjects( /* destination= */ this.config_, /* updates= */ config ); } @@ -441,36 +440,34 @@ shaka.offline.Storage = class { async downloadSegments_( toDownload, manifestId, manifestDB, downloader, config, storage, manifest, drmEngine) { + let pendingManifestUpdates = {}; + let pendingDataSize = 0; + /** * @param {!Array.} toDownload * @param {boolean} updateDRM */ const download = async (toDownload, updateDRM) => { - const throwIfAbortedFn = () => { - this.ensureNotDestroyed_(); - }; for (const download of toDownload) { - /** @param {?BufferSource} data */ - let data; const request = download.makeSegmentRequest(config); const estimateId = download.estimateId; const isInitSegment = download.isInitSegment; - const onDownloaded = (d) => { - data = d; - return Promise.resolve(); - }; - downloader.queue(download.groupId, - request, estimateId, isInitSegment, onDownloaded); - downloader.queueWork(download.groupId, async () => { - goog.asserts.assert(data, 'We should have loaded data by now'); - goog.asserts.assert(data instanceof ArrayBuffer, - 'The data should be an ArrayBuffer'); + const onDownloaded = async (data) => { + // Store the data. + const dataKeys = await storage.addSegments([{data}]); + this.ensureNotDestroyed_(); + + // Store the necessary update to the manifest, to be processed later. const ref = /** @type {!shaka.media.SegmentReference} */ ( download.ref); - manifestDB = (await shaka.offline.Storage.assignStreamToManifest( - manifestId, ref, {data}, throwIfAbortedFn)) || manifestDB; - }); + const id = shaka.offline.DownloadInfo.idForSegmentRef(ref); + pendingManifestUpdates[id] = dataKeys[0]; + pendingDataSize += data.byteLength; + }; + + downloader.queue(download.groupId, + request, estimateId, isInitSegment, onDownloaded); } await downloader.waitToFinish(); @@ -478,8 +475,8 @@ shaka.offline.Storage = class { // Re-store the manifest, to attach session IDs. // These were (maybe) discovered inside the downloader; we can only add // them now, at the end, since the manifestDB is in flux during the - // process of downloading and storing, and assignStreamToManifest does - // not know about the DRM engine. + // process of downloading and storing, and assignSegmentsToManifest + // does not know about the DRM engine. this.ensureNotDestroyed_(); this.setManifestDrmFields_(manifest, manifestDB, drmEngine, config); await storage.updateManifest(manifestId, manifestDB); @@ -496,12 +493,30 @@ shaka.offline.Storage = class { await download(toDownload.filter((info) => info.isInitSegment), true); this.ensureNotDestroyed_(); toDownload = toDownload.filter((info) => !info.isInitSegment); + + // Copy these and reset them now, before calling await. + const manifestUpdates = pendingManifestUpdates; + const dataSize = pendingDataSize; + pendingManifestUpdates = {}; + pendingDataSize = 0; + + manifestDB = + (await shaka.offline.Storage.assignSegmentsToManifest( + manifestId, manifestUpdates, dataSize, + () => this.ensureNotDestroyed_())) || manifestDB; + this.ensureNotDestroyed_(); } if (!usingBgFetch) { await download(toDownload, false); this.ensureNotDestroyed_(); + manifestDB = + (await shaka.offline.Storage.assignSegmentsToManifest( + manifestId, pendingManifestUpdates, pendingDataSize, + () => this.ensureNotDestroyed_())) || manifestDB; + this.ensureNotDestroyed_(); + goog.asserts.assert( !manifestDB.isIncomplete, 'The manifest should be complete by now'); } else { @@ -529,48 +544,33 @@ shaka.offline.Storage = class { } /** - * Load the given manifest, modifies it by assigning the given data to the - * segments corresponding to "ref", then re-stores the manifest. - * The parts of this function that modify the manifest are protected by a - * mutex, to prevent race conditions; specifically, it prevents two parallel - * instances of this method from both loading the manifest into memory at the - * same time, which would result in the slower/later call overwriting the - * changes of the other. + * Load the given manifest, assigns database key to all the segments, then + * stores the updated manifest. + * + * It is up to the caller to ensure that this method is not called + * concurrently on the same manifest. * * @param {number} manifestId - * @param {!shaka.media.SegmentReference} ref - * @param {shaka.extern.SegmentDataDB} data + * @param {!Object.} manifestUpdates + * @param {number} dataSizeUpdate * @param {function()} throwIfAbortedFn A function that should throw if the * download has been aborted. * @return {!Promise.} */ - static async assignStreamToManifest(manifestId, ref, data, throwIfAbortedFn) { + static async assignSegmentsToManifest( + manifestId, manifestUpdates, dataSizeUpdate, throwIfAbortedFn) { /** @type {shaka.offline.StorageMuxer} */ const muxer = new shaka.offline.StorageMuxer(); - const idForRef = shaka.offline.DownloadInfo.idForSegmentRef(ref); let manifestUpdated = false; - let dataKey; let activeHandle; /** @type {!shaka.extern.ManifestDB} */ let manifestDB; - let mutexId = 0; - try { await muxer.init(); activeHandle = await muxer.getActive(); - // Store the data. - const dataKeys = await activeHandle.cell.addSegments([data]); - dataKey = dataKeys[0]; - throwIfAbortedFn(); - - // Acquire the mutex before accessing the manifest, since there could be - // multiple instances of this method running at once. - mutexId = await shaka.offline.Storage.mutex_.acquire(); - throwIfAbortedFn(); - // Load the manifest. const manifests = await activeHandle.cell.getManifests([manifestId]); throwIfAbortedFn(); @@ -580,19 +580,25 @@ shaka.offline.Storage = class { let complete = true; for (const stream of manifestDB.streams) { for (const segment of stream.segments) { - if (segment.pendingSegmentRefId == idForRef) { + let dataKey = segment.pendingSegmentRefId ? + manifestUpdates[segment.pendingSegmentRefId] : null; + if (dataKey != null) { segment.dataKey = dataKey; // Now that the segment has been associated with the appropriate // dataKey, the pendingSegmentRefId is no longer necessary. segment.pendingSegmentRefId = undefined; } - if (segment.pendingInitSegmentRefId == idForRef) { + + dataKey = segment.pendingInitSegmentRefId ? + manifestUpdates[segment.pendingInitSegmentRefId] : null; + if (dataKey != null) { segment.initSegmentKey = dataKey; // Now that the init segment has been associated with the // appropriate initSegmentKey, the pendingInitSegmentRefId is no // longer necessary. segment.pendingInitSegmentRefId = undefined; } + if (segment.pendingSegmentRefId) { complete = false; } @@ -603,7 +609,7 @@ shaka.offline.Storage = class { } // Update the size of the manifest. - manifestDB.size += data.data.byteLength; + manifestDB.size += dataSizeUpdate; // Mark the manifest as complete, if all segments are downloaded. if (complete) { @@ -617,17 +623,17 @@ shaka.offline.Storage = class { } catch (e) { await shaka.offline.Storage.cleanStoredManifest(manifestId); - if (activeHandle && !manifestUpdated && dataKey) { + if (activeHandle && !manifestUpdated) { + const dataKeys = Object.values(manifestUpdates); // The cleanStoredManifest method will not "see" any segments that have // been downloaded but not assigned to the manifest yet. So un-store // them separately. - await activeHandle.cell.removeSegments([dataKey], (key) => {}); + await activeHandle.cell.removeSegments(dataKeys, (key) => {}); } throw e; } finally { await muxer.destroy(); - shaka.offline.Storage.mutex_.release(mutexId); } return manifestDB; } @@ -1629,9 +1635,6 @@ shaka.offline.Storage = class { } }; -/** @private {!shaka.util.Mutex} */ -shaka.offline.Storage.mutex_ = new shaka.util.Mutex(); - shaka.offline.Storage.defaultSystemIds_ = new Map() .set('org.w3.clearkey', '1077efecc0b24d02ace33c1e52e2fb4b') .set('com.widevine.alpha', 'edef8ba979d64acea3c827dcd51d21ed') diff --git a/lib/util/mutex.js b/lib/util/mutex.js deleted file mode 100644 index afb22f9b95..0000000000 --- a/lib/util/mutex.js +++ /dev/null @@ -1,52 +0,0 @@ -/*! @license - * Shaka Player - * Copyright 2016 Google LLC - * SPDX-License-Identifier: Apache-2.0 - */ - -goog.provide('shaka.util.Mutex'); - - -/** - * @summary A simple mutex. - */ -shaka.util.Mutex = class { - /** Creates the mutex. */ - constructor() { - /** @private {!Array.} */ - this.waiting_ = []; - - /** @private {number} */ - this.nextMutexId_ = 0; - - /** @private {number} */ - this.acquiredMutexId_ = 0; - } - - /** @return {!Promise.} mutexId */ - async acquire() { - const mutexId = ++this.nextMutexId_; - if (!this.acquiredMutexId_) { - this.acquiredMutexId_ = mutexId; - } else { - await (new Promise((resolve, reject) => { - this.waiting_.push(() => { - this.acquiredMutexId_ = mutexId; - resolve(); - }); - })); - } - return mutexId; - } - - /** @param {number} mutexId */ - release(mutexId) { - if (mutexId == this.acquiredMutexId_) { - this.acquiredMutexId_ = 0; - if (this.waiting_.length > 0) { - const callback = this.waiting_.shift(); - callback(); - } - } - } -};