Skip to content

Commit

Permalink
D1: using new polling endpoint for larger exports (#5585)
Browse files Browse the repository at this point in the history
  • Loading branch information
geelen committed Apr 12, 2024
1 parent 69577b9 commit 22f5841
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 33 deletions.
5 changes: 5 additions & 0 deletions .changeset/warm-bears-marry.md
@@ -0,0 +1,5 @@
---
"wrangler": patch
---

Updates `wrangler d1 export` to handle larger DBs more efficiently
59 changes: 48 additions & 11 deletions packages/wrangler/src/__tests__/d1/export.test.ts
Expand Up @@ -42,24 +42,61 @@ describe("execute", () => {
{ id: "IG-88", account: { id: "1701", name: "enterprise" } },
]);
const mockSqlContent = "PRAGMA defer_foreign_keys=TRUE;";

msw.use(
rest.post(
"*/accounts/:accountId/d1/database/:databaseId/export",
async (_req, res, ctx) => {
return res(
ctx.status(200),
ctx.json({
result: { signedUrl: "https://example.com" },
success: true,
errors: [],
messages: [],
})
);
async (req, res, ctx) => {
const body = await req.json();

// First request, initiates a new task
if (!body.currentBookmark) {
return res(
ctx.status(202),
ctx.json({
success: true,
result: {
success: true,
type: "export",
at_bookmark: "yyyy",
status: "active",
messages: [
"Generating xxxx-yyyy.sql",
"Uploaded part 2", // out-of-order uploads ok
"Uploaded part 1",
],
},
})
);
}
// Subsequent request, sees that it is complete
else
return res(
ctx.status(200),
ctx.json({
success: true,
result: {
success: true,
type: "export",
at_bookmark: "yyyy",
status: "complete",
result: {
filename: "xxxx-yyyy.sql",
signedUrl: "https://example.com/xxxx-yyyy.sql",
},
messages: [
"Uploaded part 3",
"Uploaded part 4",
"Finished uploading xxxx-yyyy.sql in 4 parts.",
],
},
})
);
}
)
);
msw.use(
rest.get("https://example.com", async (req, res, ctx) => {
rest.get("https://example.com/xxxx-yyyy.sql", async (req, res, ctx) => {
return res(ctx.status(200), ctx.text(mockSqlContent));
})
);
Expand Down
108 changes: 86 additions & 22 deletions packages/wrangler/src/d1/export.ts
@@ -1,10 +1,12 @@
import fs from "node:fs/promises";
import chalk from "chalk";
import { fetch } from "undici";
import { printWranglerBanner } from "..";
import { fetchResult } from "../cfetch";
import { readConfig } from "../config";
import { UserError } from "../errors";
import { logger } from "../logger";
import { APIError } from "../parse";
import { requireAuth } from "../user";
import { Name } from "./options";
import { getDatabaseByNameOrBinding } from "./utils";
Expand Down Expand Up @@ -80,9 +82,21 @@ export const Handler = async (args: HandlerOptions): Promise<void> => {
return result;
};

type ExportMetadata = {
signedUrl: string;
};
type PollingResponse = {
success: true;
type: "export";
at_bookmark: string;
messages: string[];
errors: string[];
} & (
| {
status: "active" | "error";
}
| {
status: "complete";
result: { filename: string; signedUrl: string };
}
);

async function exportRemotely(
config: Config,
Expand All @@ -101,26 +115,76 @@ async function exportRemotely(

logger.log(`🌀 Executing on remote database ${name} (${db.uuid}):`);
logger.log(`🌀 Creating export...`);
const metadata = await fetchResult<ExportMetadata>(
`/accounts/${accountId}/d1/database/${db.uuid}/export`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
outputFormat: "file",
dumpOptions: {
noSchema,
noData,
tables,
},
}),
}
);
const dumpOptions = {
noSchema,
noData,
tables,
};

const finalResponse = await pollExport(accountId, db, dumpOptions, undefined);

logger.log(`🌀 Downloading SQL to ${output}`);
const contents = await fetch(metadata.signedUrl);
if (finalResponse.status !== "complete")
throw new APIError({ text: `D1 reset before export completed!` });

logger.log(`🌀 Downloading SQL to ${output}...`);
logger.log(
chalk.gray(
`You can also download your export from the following URL manually. This link will be valid for one hour: ${finalResponse.result.signedUrl}`
)
);
const contents = await fetch(finalResponse.result.signedUrl);
await fs.writeFile(output, contents.body || "");
logger.log(`Done!`);
}

async function pollExport(
accountId: string,
db: Database,
dumpOptions: {
tables: string[];
noSchema?: boolean;
noData?: boolean;
},
currentBookmark: string | undefined,
num_parts_uploaded = 0
): Promise<PollingResponse> {
const response = await fetchResult<
PollingResponse | { success: false; error: string }
>(`/accounts/${accountId}/d1/database/${db.uuid}/export`, {
method: "POST",
body: JSON.stringify({
outputFormat: "polling",
dumpOptions,
currentBookmark,
}),
});

if (!response.success) throw new Error(response.error);

response.messages.forEach((line) => {
if (line.startsWith(`Uploaded part`)) {
// Part numbers can be reported as complete out-of-order which looks confusing to a user. But their ID has no
// special meaning, so just make them sequential.
logger.log(`🌀 Uploaded part ${++num_parts_uploaded}`);
} else {
logger.log(`🌀 ${line}`);
}
});

if (response.status === "complete") {
return response;
} else if (response.status === "error") {
throw new APIError({
text: response.errors.join("\n"),
notes: response.messages.map((text) => ({ text })),
});
} else {
return await pollExport(
accountId,
db,
dumpOptions,
response.at_bookmark,
num_parts_uploaded
);
}
}

0 comments on commit 22f5841

Please sign in to comment.