Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aws-s3-multipart: retry signature request - upload-error event not triggered on loss of network connection - not retrying requests #4908

Open
2 tasks done
JohnDennehy101 opened this issue Feb 2, 2024 · 4 comments
Assignees
Labels
AWS S3 Plugin that handles uploads to Amazon AWS S3 Bug

Comments

@JohnDennehy101
Copy link

JohnDennehy101 commented Feb 2, 2024

Initial checklist

  • I understand this is a bug report and questions should be posted in the Community Forum
  • I searched issues and couldn’t find anything (or linked relevant results below)

Link to runnable example

No response

Steps to reproduce

The overview of our current configuration can be seen below.

The front-end React application used by users to provide files for upload

"@uppy/aws-s3-multipart": "^3.7.0",
"@uppy/core": "^3.6.1",
"@uppy/dashboard": "^3.6.0",
"@uppy/drag-drop": "^3.0.3",
"@uppy/file-input": "^3.0.4",
"@uppy/progress-bar": "^3.0.4",
"@uppy/react": "^3.1.4",`

      const initializeAndSetUppy = async () => {
        const uppy = new Uppy({
          meta: { type: "avatar" },
          restrictions: {
            maxNumberOfFiles: 1,
            allowedFileTypes: [
              "application/zip",
              "application/x-zip-compressed",
              "application/x-rar-compressed",
              "text/plain",
            ],
          },
          autoProceed: true,
        });
        const accessToken = await sec.getAccessTokenSilently()();

        uppy.use(AwsS3Multipart, {
          limit: 5,
          retryDelays: [0, 1000, 3000, 5000, 30000, 60000],
          companionUrl: companion,
          serverHeaders: { authorization: "Bearer " + accessToken },
          companionHeaders: { Authorization: "Bearer " + accessToken },
          getChunkSize: () => {
            return 20000000;
          },
          contentDisposition: "attachment",
        });

        uppy.on("complete", (result) => {
          dispatch(setUploadComplete(true));
          SetUploadStatus(true);
          onUploadChange(
            result.successful,
            result.successful[0].s3Multipart.key,
            true
          );
        });

        uppy.on("upload-error", (file, error) => {


          dispatch(setError(true));
          SetUploadStatus(false);
          if (error.isNetworkError) {
            // Let your users know that file upload could have failed
            // due to firewall or ISP issues
            dispatch(
              setAlertMessage({
                message:
                  "There is a network error. Please check your internet connection and try again later.",
                type: "failure",
                autoHideDuration: 20000,
              })
            );
          } else if (error.isAuthError) {
            logout({ returnTo: import.meta.env.VITE_APP_URL });
          } else {
            dispatch(
              setAlertMessage({
                message: "Cannot upload file",
                type: "failure",
                autoHideDuration: 20000,
              })
            );
          }
        });

        uppy.on("upload-progress", async (file, progress) => {
          setLoadingFile();
        });

        uppy.on("file-added", async (file) => {
          dispatch(setFileName(file.name));
        });

        uppy.on("file-removed", async (file) => {
          setInitialStatus();
          onUploadChange(file, "", false);
        });

        dispatch(setUppy(uppy));
      }
    
    }

This is used in conjunction with Uppy Companion on the back-end.
The package from Uppy used in this service is:

"@uppy/companion": "^4.9.0",

The basic configuration there is

module.exports = ({ config }) => {
  const options = {
    s3: {
      getKey: (req, filename, metadata) => {
        return s3name(filename, metadata);
      },
      key: config.aws.access_id,
      secret: config.aws.access_key,
      bucket: config.aws.s3_bucket,
      region: config.aws.region,
      endpoint: "https://s3." + config.aws.region + ".amazonaws.com",
      signatureVersion: "v4",
      acl: "private",
      expires: 2 * 60 * 60, // Give it 2 X 60 minutes to account for very slow connections
    },
    uploadUrls: [
      "https://" +
        config.aws.s3_bucket +
        ".s3." +
        config.aws.region +
        ".amazonaws.com",
    ],
    server: { host: config.api.port, path: config.aws.lb_path },
    filePath: ".", // Not needed for s3 but it insists
    secret: config.uppy.secret, // Not for S3 either but it needs something set
    debug: false,
  };
  const { app: companionApp } = companion.app(options);
  return companionApp;
};

I have never had an issue with Uppy before and find it a great service that is provided via open source. Just spotted this as one of our clients mentioned an upload was hanging indefinitely in the browser and after a lot of debugging I was able to narrow it down to a change in the code in the aws-s3-multipart package.

Expected behavior

The upload-error event should be triggered if the upload fails beyond allotted retryDelays provided to AwsS3Multipart.

Actual behavior

The browser hangs indefinitely and the upload-error event is not triggered on loss of network connection. It looks to me like the requests are not being retried according to the provided retryInterval configuration.

I have traced this issue to this change - #4691

I have been able to resolve this by reverting the code changes in #4691 in the aws-s3-multipart package.

When I use the below code (code with the changes in #4691 reverted), the upload-error event is triggered after all of the retryIntervals have been exceeded.

all plugin code
import BasePlugin from "@uppy/core/lib/BasePlugin.js";
import { RequestClient } from "@uppy/companion-client";
import EventManager from "@uppy/utils/lib/EventManager";
import { RateLimitedQueue } from "@uppy/utils/lib/RateLimitedQueue";
import {
  filterNonFailedFiles,
  filterFilesToEmitUploadStarted,
} from "@uppy/utils/lib/fileFilters";
import { createAbortError } from "@uppy/utils/lib/AbortController";

import MultipartUploader, { pausingUploadReason } from "./MultipartUploader.js";
import createSignedURL from "./createSignedURL.js";

function assertServerError(res) {
  if (res && res.error) {
    const error = new Error(res.message);
    Object.assign(error, res.error);
    throw error;
  }
  return res;
}

function removeMetadataFromURL(urlString) {
  const urlObject = new URL(urlString);
  urlObject.search = "";
  urlObject.hash = "";
  return urlObject.href;
}

/**
 * Computes the expiry time for a request signed with temporary credentials. If
 * no expiration was provided, or an invalid value (e.g. in the past) is
 * provided, undefined is returned. This function assumes the client clock is in
 * sync with the remote server, which is a requirement for the signature to be
 * validated for AWS anyway.
 *
 * @param {import('../types/index.js').AwsS3STSResponse['credentials']} credentials
 * @returns {number | undefined}
 */
function getExpiry(credentials) {
  const expirationDate = credentials.Expiration;
  if (expirationDate) {
    const timeUntilExpiry = Math.floor(
      (new Date(expirationDate) - Date.now()) / 1000
    );
    if (timeUntilExpiry > 9) {
      return timeUntilExpiry;
    }
  }
  return undefined;
}

function getAllowedMetadata({ meta, allowedMetaFields, querify = false }) {
  const metaFields = allowedMetaFields ?? Object.keys(meta);

  if (!meta) return {};

  return Object.fromEntries(
    metaFields
      .filter((key) => meta[key] != null)
      .map((key) => {
        const realKey = querify ? `metadata[${key}]` : key;
        const value = String(meta[key]);
        return [realKey, value];
      })
  );
}

function throwIfAborted(signal) {
  if (signal?.aborted) {
    throw createAbortError("The operation was aborted", {
      cause: signal.reason,
    });
  }
}

class HTTPCommunicationQueue {
  #abortMultipartUpload;

  #cache = new WeakMap();

  #createMultipartUpload;

  #fetchSignature;

  #getUploadParameters;

  #listParts;

  #previousRetryDelay;

  #requests;

  #retryDelayIterator;

  #sendCompletionRequest;

  #setS3MultipartState;

  #uploadPartBytes;

  #getFile;

  constructor(requests, options, setS3MultipartState, getFile) {
    this.#requests = requests;
    this.#setS3MultipartState = setS3MultipartState;
    this.#getFile = getFile;
    this.setOptions(options);
  }

  setOptions(options) {
    const requests = this.#requests;

    if ("abortMultipartUpload" in options) {
      this.#abortMultipartUpload = requests.wrapPromiseFunction(
        options.abortMultipartUpload,
        { priority: 1 }
      );
    }
    if ("createMultipartUpload" in options) {
      this.#createMultipartUpload = requests.wrapPromiseFunction(
        options.createMultipartUpload,
        { priority: -1 }
      );
    }
    if ("signPart" in options) {
      this.#fetchSignature = requests.wrapPromiseFunction(options.signPart);
    }
    if ("listParts" in options) {
      this.#listParts = requests.wrapPromiseFunction(options.listParts);
    }
    if ("completeMultipartUpload" in options) {
      this.#sendCompletionRequest = requests.wrapPromiseFunction(
        options.completeMultipartUpload,
        { priority: 1 }
      );
    }
    if ("retryDelays" in options) {
      this.#retryDelayIterator = options.retryDelays?.values();
    }
    if ("uploadPartBytes" in options) {
      this.#uploadPartBytes = requests.wrapPromiseFunction(
        options.uploadPartBytes,
        { priority: Infinity }
      );
    }
    if ("getUploadParameters" in options) {
      this.#getUploadParameters = requests.wrapPromiseFunction(
        options.getUploadParameters
      );
    }
  }

  async #shouldRetry(err) {
    const requests = this.#requests;
    const status = err?.source?.status;

    // TODO: this retry logic is taken out of Tus. We should have a centralized place for retrying,
    // perhaps the rate limited queue, and dedupe all plugins with that.
    if (status == null) {
      return false;
    }
    if (status === 403 && err.message === "Request has expired") {
      if (!requests.isPaused) {
        // We don't want to exhaust the retryDelayIterator as long as there are
        // more than one request in parallel, to give slower connection a chance
        // to catch up with the expiry set in Companion.
        if (requests.limit === 1 || this.#previousRetryDelay == null) {
          const next = this.#retryDelayIterator?.next();
          if (next == null || next.done) {
            return false;
          }
          // If there are more than 1 request done in parallel, the RLQ limit is
          // decreased and the failed request is requeued after waiting for a bit.
          // If there is only one request in parallel, the limit can't be
          // decreased, so we iterate over `retryDelayIterator` as we do for
          // other failures.
          // `#previousRetryDelay` caches the value so we can re-use it next time.
          this.#previousRetryDelay = next.value;
        }
        // No need to stop the other requests, we just want to lower the limit.
        requests.rateLimit(0);
        await new Promise((resolve) =>
          setTimeout(resolve, this.#previousRetryDelay)
        );
      }
    } else if (status === 429) {
      // HTTP 429 Too Many Requests => to avoid the whole download to fail, pause all requests.
      if (!requests.isPaused) {
        const next = this.#retryDelayIterator?.next();
        if (next == null || next.done) {
          return false;
        }
        requests.rateLimit(next.value);
      }
    } else if (status > 400 && status < 500 && status !== 409) {
      // HTTP 4xx, the server won't send anything, it's doesn't make sense to retry
      return false;
    } else if (typeof navigator !== "undefined" && navigator.onLine === false) {
      // The navigator is offline, let's wait for it to come back online.
      if (!requests.isPaused) {
        requests.pause();
        window.addEventListener(
          "online",
          () => {
            requests.resume();
          },
          { once: true }
        );
      }
    } else {
      // Other error code means the request can be retried later.
      const next = this.#retryDelayIterator?.next();
      if (next == null || next.done) {
        return false;
      }
      await new Promise((resolve) => setTimeout(resolve, next.value));
    }
    return true;
  }

  async getUploadId(file, signal) {
    let cachedResult;
    // As the cache is updated asynchronously, there could be a race condition
    // where we just miss a new result so we loop here until we get nothing back,
    // at which point it's out turn to create a new cache entry.
    while ((cachedResult = this.#cache.get(file.data)) != null) {
      try {
        return await cachedResult;
      } catch {
        // In case of failure, we want to ignore the cached error.
        // At this point, either there's a new cached value, or we'll exit the loop a create a new one.
      }
    }

    const promise = this.#createMultipartUpload(this.#getFile(file), signal);

    const abortPromise = () => {
      promise.abort(signal.reason);
      this.#cache.delete(file.data);
    };
    signal.addEventListener("abort", abortPromise, { once: true });
    this.#cache.set(file.data, promise);
    promise.then(
      async (result) => {
        signal.removeEventListener("abort", abortPromise);
        this.#setS3MultipartState(file, result);
        this.#cache.set(file.data, result);
      },
      () => {
        signal.removeEventListener("abort", abortPromise);
        this.#cache.delete(file.data);
      }
    );

    return promise;
  }

  async abortFileUpload(file) {
    const result = this.#cache.get(file.data);
    if (result == null) {
      // If the createMultipartUpload request never was made, we don't
      // need to send the abortMultipartUpload request.
      return;
    }
    // Remove the cache entry right away for follow-up requests do not try to
    // use the soon-to-be aborted chached values.
    this.#cache.delete(file.data);
    this.#setS3MultipartState(file, Object.create(null));
    let awaitedResult;
    try {
      awaitedResult = await result;
    } catch {
      // If the cached result rejects, there's nothing to abort.
      return;
    }
    await this.#abortMultipartUpload(this.#getFile(file), awaitedResult);
  }

  async #nonMultipartUpload(file, chunk, signal) {
    const {
      method = "POST",
      url,
      fields,
      headers,
    } = await this.#getUploadParameters(this.#getFile(file), {
      signal,
    }).abortOn(signal);

    let body;
    const data = chunk.getData();
    if (method.toUpperCase() === "POST") {
      const formData = new FormData();
      Object.entries(fields).forEach(([key, value]) =>
        formData.set(key, value)
      );
      formData.set("file", data);
      body = formData;
    } else {
      body = data;
    }

    const { onProgress, onComplete } = chunk;

    const result = await this.#uploadPartBytes({
      signature: { url, headers, method },
      body,
      size: data.size,
      onProgress,
      onComplete,
      signal,
    }).abortOn(signal);

    return "location" in result
      ? result
      : {
          location: removeMetadataFromURL(url),
          ...result,
        };
  }

  /**
   * @param {import("@uppy/core").UppyFile} file
   * @param {import("../types/chunk").Chunk[]} chunks
   * @param {AbortSignal} signal
   * @returns {Promise<void>}
   */
  async uploadFile(file, chunks, signal) {
    throwIfAborted(signal);
    if (chunks.length === 1 && !chunks[0].shouldUseMultipart) {
      return this.#nonMultipartUpload(file, chunks[0], signal);
    }
    const { uploadId, key } = await this.getUploadId(file, signal);
    throwIfAborted(signal);
    try {
      const parts = await Promise.all(
        chunks.map((chunk, i) => this.uploadChunk(file, i + 1, chunk, signal))
      );
      throwIfAborted(signal);
      return await this.#sendCompletionRequest(
        this.#getFile(file),
        { key, uploadId, parts, signal },
        signal
      ).abortOn(signal);
    } catch (err) {
      if (err?.cause !== pausingUploadReason && err?.name !== "AbortError") {
        // We purposefully don't wait for the promise and ignore its status,
        // because we want the error `err` to bubble up ASAP to report it to the
        // user. A failure to abort is not that big of a deal anyway.
        this.abortFileUpload(file);
      }
      throw err;
    }
  }

  restoreUploadFile(file, uploadIdAndKey) {
    this.#cache.set(file.data, uploadIdAndKey);
  }

  async resumeUploadFile(file, chunks, signal) {
    throwIfAborted(signal);
    if (
      chunks.length === 1 &&
      chunks[0] != null &&
      !chunks[0].shouldUseMultipart
    ) {
      return this.#nonMultipartUpload(file, chunks[0], signal);
    }
    const { uploadId, key } = await this.getUploadId(file, signal);
    throwIfAborted(signal);
    const alreadyUploadedParts = await this.#listParts(
      this.#getFile(file),
      { uploadId, key, signal },
      signal
    ).abortOn(signal);
    throwIfAborted(signal);
    const parts = await Promise.all(
      chunks.map((chunk, i) => {
        const partNumber = i + 1;
        const alreadyUploadedInfo = alreadyUploadedParts.find(
          ({ PartNumber }) => PartNumber === partNumber
        );
        if (alreadyUploadedInfo == null) {
          return this.uploadChunk(file, partNumber, chunk, signal);
        }
        // Already uploaded chunks are set to null. If we are restoring the upload, we need to mark it as already uploaded.
        chunk?.setAsUploaded?.();
        return { PartNumber: partNumber, ETag: alreadyUploadedInfo.ETag };
      })
    );
    throwIfAborted(signal);
    return this.#sendCompletionRequest(
      this.#getFile(file),
      { key, uploadId, parts, signal },
      signal
    ).abortOn(signal);
  }

  /**
   *
   * @param {import("@uppy/core").UppyFile} file
   * @param {number} partNumber
   * @param {import("../types/chunk").Chunk} chunk
   * @param {AbortSignal} signal
   * @returns {Promise<object>}
   */
  async uploadChunk(file, partNumber, chunk, signal) {
    const { uploadId, key } = await this.getUploadId(file, signal);
    throwIfAborted(signal);

    for (;;) {
      const chunkData = chunk.getData();
      const { onProgress, onComplete } = chunk;

      const signature = await this.#fetchSignature(this.#getFile(file), {
        uploadId,
        key,
        partNumber,
        body: chunkData,
        signal,
      }).abortOn(signal);

      throwIfAborted(signal);
      try {
        return {
          PartNumber: partNumber,
          ...(await this.#uploadPartBytes({
            signature,
            body: chunkData,
            size: chunkData.size,
            onProgress,
            onComplete,
            signal,
          }).abortOn(signal)),
        };
      } catch (err) {
        if (!(await this.#shouldRetry(err))) throw err;
      }
    }
  }
}

export default class AwsS3Multipart extends BasePlugin {
  static VERSION = "3.10.0";

  #companionCommunicationQueue;

  #client;

  constructor(uppy, opts) {
    super(uppy, opts);
    this.type = "uploader";
    this.id = this.opts.id || "AwsS3Multipart";
    this.title = "AWS S3 Multipart";
    this.#client = new RequestClient(uppy, opts);

    const defaultOptions = {
      // TODO: null here means “include all”, [] means include none.
      // This is inconsistent with @uppy/aws-s3 and @uppy/transloadit
      allowedMetaFields: null,
      limit: 6,
      shouldUseMultipart: (file) => file.size !== 0, // TODO: Switch default to:
      // eslint-disable-next-line no-bitwise
      // shouldUseMultipart: (file) => file.size >> 10 >> 10 > 100,
      retryDelays: [0, 1000, 3000, 5000],
      createMultipartUpload: this.createMultipartUpload.bind(this),
      listParts: this.listParts.bind(this),
      abortMultipartUpload: this.abortMultipartUpload.bind(this),
      completeMultipartUpload: this.completeMultipartUpload.bind(this),
      getTemporarySecurityCredentials: false,
      signPart: opts?.getTemporarySecurityCredentials
        ? this.createSignedURL.bind(this)
        : this.signPart.bind(this),
      uploadPartBytes: AwsS3Multipart.uploadPartBytes,
      getUploadParameters: opts?.getTemporarySecurityCredentials
        ? this.createSignedURL.bind(this)
        : this.getUploadParameters.bind(this),
      companionHeaders: {},
    };

    this.opts = { ...defaultOptions, ...opts };
    if (opts?.prepareUploadParts != null && opts.signPart == null) {
      this.opts.signPart = async (
        file,
        { uploadId, key, partNumber, body, signal }
      ) => {
        const { presignedUrls, headers } = await opts.prepareUploadParts(file, {
          uploadId,
          key,
          parts: [{ number: partNumber, chunk: body }],
          signal,
        });
        return {
          url: presignedUrls?.[partNumber],
          headers: headers?.[partNumber],
        };
      };
    }

    /**
     * Simultaneous upload limiting is shared across all uploads with this plugin.
     *
     * @type {RateLimitedQueue}
     */
    this.requests =
      this.opts.rateLimitedQueue ?? new RateLimitedQueue(this.opts.limit);
    this.#companionCommunicationQueue = new HTTPCommunicationQueue(
      this.requests,
      this.opts,
      this.#setS3MultipartState,
      this.#getFile
    );

    this.uploaders = Object.create(null);
    this.uploaderEvents = Object.create(null);
    this.uploaderSockets = Object.create(null);
  }

  [Symbol.for("uppy test: getClient")]() {
    return this.#client;
  }

  setOptions(newOptions) {
    this.#companionCommunicationQueue.setOptions(newOptions);
    super.setOptions(newOptions);
    this.#setCompanionHeaders();
  }

  /**
   * Clean up all references for a file's upload: the MultipartUploader instance,
   * any events related to the file, and the Companion WebSocket connection.
   *
   * Set `opts.abort` to tell S3 that the multipart upload is cancelled and must be removed.
   * This should be done when the user cancels the upload, not when the upload is completed or errored.
   */
  resetUploaderReferences(fileID, opts = {}) {
    if (this.uploaders[fileID]) {
      this.uploaders[fileID].abort({ really: opts.abort || false });
      this.uploaders[fileID] = null;
    }
    if (this.uploaderEvents[fileID]) {
      this.uploaderEvents[fileID].remove();
      this.uploaderEvents[fileID] = null;
    }
    if (this.uploaderSockets[fileID]) {
      this.uploaderSockets[fileID].close();
      this.uploaderSockets[fileID] = null;
    }
  }

  // TODO: make this a private method in the next major
  assertHost(method) {
    if (!this.opts.companionUrl) {
      throw new Error(
        `Expected a \`companionUrl\` option containing a Companion address, or if you are not using Companion, a custom \`${method}\` implementation.`
      );
    }
  }

  createMultipartUpload(file, signal) {
    this.assertHost("createMultipartUpload");
    throwIfAborted(signal);

    const metadata = getAllowedMetadata({
      meta: file.meta,
      allowedMetaFields: this.opts.allowedMetaFields,
    });

    return this.#client
      .post(
        "s3/multipart",
        {
          filename: file.name,
          type: file.type,
          metadata,
        },
        { signal }
      )
      .then(assertServerError);
  }

  listParts(file, { key, uploadId }, signal) {
    this.assertHost("listParts");
    throwIfAborted(signal);

    const filename = encodeURIComponent(key);
    return this.#client
      .get(`s3/multipart/${uploadId}?key=${filename}`, { signal })
      .then(assertServerError);
  }

  completeMultipartUpload(file, { key, uploadId, parts }, signal) {
    this.assertHost("completeMultipartUpload");
    throwIfAborted(signal);

    const filename = encodeURIComponent(key);
    const uploadIdEnc = encodeURIComponent(uploadId);
    return this.#client
      .post(
        `s3/multipart/${uploadIdEnc}/complete?key=${filename}`,
        { parts },
        { signal }
      )
      .then(assertServerError);
  }

  /**
   * @type {import("../types").AwsS3STSResponse | Promise<import("../types").AwsS3STSResponse>}
   */
  #cachedTemporaryCredentials;

  async #getTemporarySecurityCredentials(options) {
    throwIfAborted(options?.signal);

    if (this.#cachedTemporaryCredentials == null) {
      // We do not await it just yet, so concurrent calls do not try to override it:
      if (this.opts.getTemporarySecurityCredentials === true) {
        this.assertHost("getTemporarySecurityCredentials");
        this.#cachedTemporaryCredentials = this.#client
          .get("s3/sts", null, options)
          .then(assertServerError);
      } else {
        this.#cachedTemporaryCredentials =
          this.opts.getTemporarySecurityCredentials(options);
      }
      this.#cachedTemporaryCredentials = await this.#cachedTemporaryCredentials;
      setTimeout(
        () => {
          // At half the time left before expiration, we clear the cache. That's
          // an arbitrary tradeoff to limit the number of requests made to the
          // remote while limiting the risk of using an expired token in case the
          // clocks are not exactly synced.
          // The HTTP cache should be configured to ensure a client doesn't request
          // more tokens than it needs, but this timeout provides a second layer of
          // security in case the HTTP cache is disabled or misconfigured.
          this.#cachedTemporaryCredentials = null;
        },
        (getExpiry(this.#cachedTemporaryCredentials.credentials) || 0) * 500
      );
    }

    return this.#cachedTemporaryCredentials;
  }

  async createSignedURL(file, options) {
    const data = await this.#getTemporarySecurityCredentials(options);
    const expires = getExpiry(data.credentials) || 604_800; // 604 800 is the max value accepted by AWS.

    const { uploadId, key, partNumber, signal } = options;

    // Return an object in the correct shape.
    return {
      method: "PUT",
      expires,
      fields: {},
      url: `${await createSignedURL({
        accountKey: data.credentials.AccessKeyId,
        accountSecret: data.credentials.SecretAccessKey,
        sessionToken: data.credentials.SessionToken,
        expires,
        bucketName: data.bucket,
        Region: data.region,
        Key: key ?? `${crypto.randomUUID()}-${file.name}`,
        uploadId,
        partNumber,
        signal,
      })}`,
      // Provide content type header required by S3
      headers: {
        "Content-Type": file.type,
      },
    };
  }

  signPart(file, { uploadId, key, partNumber, signal }) {
    this.assertHost("signPart");
    throwIfAborted(signal);

    if (uploadId == null || key == null || partNumber == null) {
      throw new Error(
        "Cannot sign without a key, an uploadId, and a partNumber"
      );
    }

    const filename = encodeURIComponent(key);
    return this.#client
      .get(`s3/multipart/${uploadId}/${partNumber}?key=${filename}`, { signal })
      .then(assertServerError);
  }

  abortMultipartUpload(file, { key, uploadId }, signal) {
    this.assertHost("abortMultipartUpload");

    const filename = encodeURIComponent(key);
    const uploadIdEnc = encodeURIComponent(uploadId);
    return this.#client
      .delete(`s3/multipart/${uploadIdEnc}?key=${filename}`, undefined, {
        signal,
      })
      .then(assertServerError);
  }

  getUploadParameters(file, options) {
    const { meta } = file;
    const { type, name: filename } = meta;
    const metadata = getAllowedMetadata({
      meta,
      allowedMetaFields: this.opts.allowedMetaFields,
      querify: true,
    });

    const query = new URLSearchParams({ filename, type, ...metadata });

    return this.#client.get(`s3/params?${query}`, options);
  }

  static async uploadPartBytes({
    signature: { url, expires, headers, method = "PUT" },
    body,
    size = body.size,
    onProgress,
    onComplete,
    signal,
  }) {
    throwIfAborted(signal);

    if (url == null) {
      throw new Error("Cannot upload to an undefined URL");
    }

    return new Promise((resolve, reject) => {
      const xhr = new XMLHttpRequest();
      xhr.open(method, url, true);
      if (headers) {
        Object.keys(headers).forEach((key) => {
          xhr.setRequestHeader(key, headers[key]);
        });
      }
      xhr.responseType = "text";
      if (typeof expires === "number") {
        xhr.timeout = expires * 1000;
      }

      function onabort() {
        xhr.abort();
      }
      function cleanup() {
        signal.removeEventListener("abort", onabort);
      }
      signal.addEventListener("abort", onabort);

      xhr.upload.addEventListener("progress", (ev) => {
        onProgress(ev);
      });

      xhr.addEventListener("abort", () => {
        cleanup();

        reject(createAbortError());
      });

      xhr.addEventListener("timeout", () => {
        cleanup();

        const error = new Error("Request has expired");
        error.source = { status: 403 };
        reject(error);
      });
      xhr.addEventListener("load", (ev) => {
        cleanup();

        if (
          ev.target.status === 403 &&
          ev.target.responseText.includes(
            "<Message>Request has expired</Message>"
          )
        ) {
          const error = new Error("Request has expired");
          error.source = ev.target;
          reject(error);
          return;
        }
        if (ev.target.status < 200 || ev.target.status >= 300) {
          const error = new Error("Non 2xx");
          error.source = ev.target;
          reject(error);
          return;
        }

        // todo make a proper onProgress API (breaking change)
        onProgress?.({ loaded: size, lengthComputable: true });

        // NOTE This must be allowed by CORS.
        const etag = ev.target.getResponseHeader("ETag");
        const location = ev.target.getResponseHeader("Location");

        if (method.toUpperCase() === "POST" && location === null) {
          // Not being able to read the Location header is not a fatal error.
          // eslint-disable-next-line no-console
          console.warn(
            "AwsS3/Multipart: Could not read the Location header. This likely means CORS is not configured correctly on the S3 Bucket. See https://uppy.io/docs/aws-s3-multipart#S3-Bucket-Configuration for instructions."
          );
        }
        if (etag === null) {
          reject(
            new Error(
              "AwsS3/Multipart: Could not read the ETag header. This likely means CORS is not configured correctly on the S3 Bucket. See https://uppy.io/docs/aws-s3-multipart#S3-Bucket-Configuration for instructions."
            )
          );
          return;
        }

        onComplete?.(etag);
        resolve({
          ETag: etag,
          ...(location ? { location } : undefined),
        });
      });

      xhr.addEventListener("error", (ev) => {
        cleanup();

        const error = new Error("Unknown error");
        error.source = ev.target;
        reject(error);
      });

      xhr.send(body);
    });
  }

  #setS3MultipartState = (file, { key, uploadId }) => {
    const cFile = this.uppy.getFile(file.id);
    if (cFile == null) {
      // file was removed from store
      return;
    }

    this.uppy.setFileState(file.id, {
      s3Multipart: {
        ...cFile.s3Multipart,
        key,
        uploadId,
      },
    });
  };

  #getFile = (file) => {
    return this.uppy.getFile(file.id) || file;
  };

  #uploadLocalFile(file) {
    return new Promise((resolve, reject) => {
      const onProgress = (bytesUploaded, bytesTotal) => {
        this.uppy.emit("upload-progress", file, {
          uploader: this,
          bytesUploaded,
          bytesTotal,
        });
      };

      const onError = (err) => {
        this.uppy.log(err);
        this.uppy.emit("upload-error", file, err);

        this.resetUploaderReferences(file.id);
        reject(err);
      };

      const onSuccess = (result) => {
        const uploadResp = {
          body: {
            ...result,
          },
          uploadURL: result.location,
        };

        this.resetUploaderReferences(file.id);

        this.uppy.emit("upload-success", this.#getFile(file), uploadResp);

        if (result.location) {
          this.uppy.log(`Download ${file.name} from ${result.location}`);
        }

        resolve();
      };

      const onPartComplete = (part) => {
        this.uppy.emit("s3-multipart:part-uploaded", this.#getFile(file), part);
      };

      const upload = new MultipartUploader(file.data, {
        // .bind to pass the file object to each handler.
        companionComm: this.#companionCommunicationQueue,

        log: (...args) => this.uppy.log(...args),
        getChunkSize: this.opts.getChunkSize
          ? this.opts.getChunkSize.bind(this)
          : null,

        onProgress,
        onError,
        onSuccess,
        onPartComplete,

        file,
        shouldUseMultipart: this.opts.shouldUseMultipart,

        ...file.s3Multipart,
      });

      this.uploaders[file.id] = upload;
      const eventManager = new EventManager(this.uppy);
      this.uploaderEvents[file.id] = eventManager;

      eventManager.onFileRemove(file.id, (removed) => {
        upload.abort();
        this.resetUploaderReferences(file.id, { abort: true });
        resolve(`upload ${removed.id} was removed`);
      });

      eventManager.onCancelAll(file.id, ({ reason } = {}) => {
        if (reason === "user") {
          upload.abort();
          this.resetUploaderReferences(file.id, { abort: true });
        }
        resolve(`upload ${file.id} was canceled`);
      });

      eventManager.onFilePause(file.id, (isPaused) => {
        if (isPaused) {
          upload.pause();
        } else {
          upload.start();
        }
      });

      eventManager.onPauseAll(file.id, () => {
        upload.pause();
      });

      eventManager.onResumeAll(file.id, () => {
        upload.start();
      });

      upload.start();
    });
  }

  // eslint-disable-next-line class-methods-use-this
  #getCompanionClientArgs(file) {
    return {
      ...file.remote.body,
      protocol: "s3-multipart",
      size: file.data.size,
      metadata: file.meta,
    };
  }

  #upload = async (fileIDs) => {
    if (fileIDs.length === 0) return undefined;

    const files = this.uppy.getFilesByIds(fileIDs);
    const filesFiltered = filterNonFailedFiles(files);
    const filesToEmit = filterFilesToEmitUploadStarted(filesFiltered);

    this.uppy.emit("upload-start", filesToEmit);

    const promises = filesFiltered.map((file) => {
      if (file.isRemote) {
        const getQueue = () => this.requests;
        this.#setResumableUploadsCapability(false);
        const controller = new AbortController();

        const removedHandler = (removedFile) => {
          if (removedFile.id === file.id) controller.abort();
        };
        this.uppy.on("file-removed", removedHandler);

        const uploadPromise = this.uppy
          .getRequestClientForFile(file)
          .uploadRemoteFile(file, this.#getCompanionClientArgs(file), {
            signal: controller.signal,
            getQueue,
          });

        this.requests.wrapSyncFunction(
          () => {
            this.uppy.off("file-removed", removedHandler);
          },
          { priority: -1 }
        )();

        return uploadPromise;
      }

      return this.#uploadLocalFile(file);
    });

    const upload = await Promise.all(promises);
    // After the upload is done, another upload may happen with only local files.
    // We reset the capability so that the next upload can use resumable uploads.
    this.#setResumableUploadsCapability(true);
    return upload;
  };

  #setCompanionHeaders = () => {
    this.#client.setCompanionHeaders(this.opts.companionHeaders);
  };

  #setResumableUploadsCapability = (boolean) => {
    const { capabilities } = this.uppy.getState();
    this.uppy.setState({
      capabilities: {
        ...capabilities,
        resumableUploads: boolean,
      },
    });
  };

  #resetResumableCapability = () => {
    this.#setResumableUploadsCapability(true);
  };

  install() {
    this.#setResumableUploadsCapability(true);
    this.uppy.addPreProcessor(this.#setCompanionHeaders);
    this.uppy.addUploader(this.#upload);
    this.uppy.on("cancel-all", this.#resetResumableCapability);
  }

  uninstall() {
    this.uppy.removePreProcessor(this.#setCompanionHeaders);
    this.uppy.removeUploader(this.#upload);
    this.uppy.off("cancel-all", this.#resetResumableCapability);
  }
}
@Murderlon Murderlon added the AWS S3 Plugin that handles uploads to Amazon AWS S3 label Feb 5, 2024
@JohnDennehy101
Copy link
Author

Hi @Murderlon @aduh95, thanks for assigning for further investigation. Just wondering if ye have had a chance to take a look in more detail / will this issue be resolved in a future release?

@macmillen
Copy link

This is also happening to me. I blocked the outgoing aws connection to get a timeout error to test the upload-error hook but it never triggers the hook.

@aduh95
Copy link
Member

aduh95 commented May 3, 2024

Hello, I'm having trouble reproducing, are you able to reproduce consistently the issue or is there some flakiness involved? Are you trying to upload local files, or files from a remote provider (e.g. Google Drive)? What is the behavior on the DevTools? Do you see an infinite amount of requests being sent (and at what interval are they sent), or is it just one request pending indefinitely?

For the record, I tried to reproduce using yarn dev:with-companion:

  • I added uppyDashboard.on('upload-error', function(...args) {debugger}) in private/dev/Dashboard.js to detect if the event is emitted.
  • I configured Firefox DevTools to block all https://s3.us-east-1.amazonaws.com/* requests.
  • I started to upload a local file.

This is also happening to me. I blocked the outgoing aws connection to get a timeout error to test the upload-error hook but it never triggers the hook.

Can you clarify if you block the outgoing connections from the client or from Companion side?

@JohnDennehy101
Copy link
Author

JohnDennehy101 commented May 9, 2024

Hi @aduh95,

I have resolved this issue by hardcoding a custom plugin using the code before the change I highlighted was implemented.

I was able to consistently reproduce the issue at the time. Our use case is for uploading large local files to s3 using pre-signed urls returned from companion.

On loss of network connection, the pending requests just remain in that state and the browser just hangs (with the upload progress spinner still displaying that the upload is in progress).

We do not block the outgoing connections from either side.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AWS S3 Plugin that handles uploads to Amazon AWS S3 Bug
Projects
None yet
Development

No branches or pull requests

4 participants