Skip to content

Commit

Permalink
Migrate to Async NIOFileIO APIs (#3167)
Browse files Browse the repository at this point in the history
* Add tests

* Update NIO dependency

* Migrate over to async APIs for FileIO

* Add AsyncSequence-based file read method (#3170)

* Add AsyncSequence-based file read method

* Add `NIOFileSystem` dependency to standard manifest

* Update test

* Add long test file

* Migrate new API to use NIOFileSystem

* Fix mess up

---------

Co-authored-by: Paul Toffoloni <69189821+ptoffy@users.noreply.github.com>
  • Loading branch information
0xTim and ptoffy committed Apr 24, 2024
1 parent 3e3d65b commit 4c80aab
Show file tree
Hide file tree
Showing 6 changed files with 755 additions and 63 deletions.
4 changes: 3 additions & 1 deletion Package.swift
Expand Up @@ -34,7 +34,7 @@ let package = Package(
.package(url: "https://github.com/swift-server/swift-backtrace.git", from: "1.1.1"),

// Event-driven network application framework for high performance protocol servers & clients, non-blocking.
.package(url: "https://github.com/apple/swift-nio.git", from: "2.62.0"),
.package(url: "https://github.com/apple/swift-nio.git", from: "2.63.0"),

// Bindings to OpenSSL-compatible libraries for TLS support in SwiftNIO
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.8.0"),
Expand Down Expand Up @@ -92,6 +92,8 @@ let package = Package(
.product(name: "WebSocketKit", package: "websocket-kit"),
.product(name: "MultipartKit", package: "multipart-kit"),
.product(name: "Atomics", package: "swift-atomics"),

.product(name: "_NIOFileSystem", package: "swift-nio"),
]),

// Development
Expand Down
4 changes: 3 additions & 1 deletion Package@swift-5.9.swift
Expand Up @@ -31,7 +31,7 @@ let package = Package(
.package(url: "https://github.com/vapor/routing-kit.git", from: "4.9.0"),

// Event-driven network application framework for high performance protocol servers & clients, non-blocking.
.package(url: "https://github.com/apple/swift-nio.git", from: "2.62.0"),
.package(url: "https://github.com/apple/swift-nio.git", from: "2.63.0"),

// Bindings to OpenSSL-compatible libraries for TLS support in SwiftNIO
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.8.0"),
Expand Down Expand Up @@ -90,6 +90,8 @@ let package = Package(
.product(name: "WebSocketKit", package: "websocket-kit"),
.product(name: "MultipartKit", package: "multipart-kit"),
.product(name: "Atomics", package: "swift-atomics"),

.product(name: "_NIOFileSystem", package: "swift-nio"),
],
swiftSettings: [.enableExperimentalFeature("StrictConcurrency=complete")]
),
Expand Down
50 changes: 0 additions & 50 deletions Sources/Vapor/Concurrency/FileIO+Concurrency.swift

This file was deleted.

126 changes: 115 additions & 11 deletions Sources/Vapor/Utilities/FileIO.swift
@@ -1,5 +1,6 @@
import Foundation
import NIOCore
import NIOFileSystem
import NIOHTTP1
import NIOPosix
import Logging
Expand All @@ -22,21 +23,19 @@ extension Request {
///
/// It can read files, both in their entirety and chunked.
///
/// let fileio = try c.make(FileIO.self)
///
/// fileio.readFile(at: "/path/to/file.txt") { chunk in
/// req.fileio.readFile(at: "/path/to/file.txt") { chunk in
/// print(chunk) // part of file
/// }
///
/// fileio.collectFile(at: "/path/to/file.txt").map { file in
/// req.fileio.collectFile(at: "/path/to/file.txt").map { file in
/// print(file) // entire file
/// }
///
/// It can also create streaming HTTP responses.
///
/// let fileio = try c.make(FileIO.self)
/// router.get("file-stream") { req -> Response in
/// return fileio.streamFile(at: "/path/to/file.txt", for: req)
/// app.get("file-stream") { req -> Response in
/// return req.fileio.streamFile(at: "/path/to/file.txt", for: req)
/// }
///
/// Streaming file responses respect `E-Tag` headers present in the request.
Expand All @@ -50,6 +49,8 @@ public struct FileIO: Sendable {
/// HTTP request context.
let request: Request

let fileSystem: FileSystem = .shared

/// Creates a new `FileIO`.
///
/// See `Request.fileio()` to create one.
Expand All @@ -61,12 +62,12 @@ public struct FileIO: Sendable {

/// Reads the contents of a file at the supplied path.
///
/// let data = try req.fileio().read(file: "/path/to/file.txt").wait()
/// let data = try req.fileio.collectFile(at: "/path/to/file.txt").wait()
/// print(data) // file data
///
/// - parameters:
/// - path: Path to file on the disk.
/// - returns: `Future` containing the file data.
/// - returns: `Future` containing the file data as a `ByteBuffer`.
public func collectFile(at path: String) -> EventLoopFuture<ByteBuffer> {
let dataWrapper: NIOLockedValueBox<ByteBuffer> = .init(self.allocator.buffer(capacity: 0))
return self.readFile(at: path) { new in
Expand All @@ -78,7 +79,7 @@ public struct FileIO: Sendable {

/// Reads the contents of a file at the supplied path in chunks.
///
/// try req.fileio().readChunked(file: "/path/to/file.txt") { chunk in
/// try req.fileio.readFile(at: "/path/to/file.txt") { chunk in
/// print("chunk: \(data)")
/// }.wait()
///
Expand Down Expand Up @@ -113,7 +114,7 @@ public struct FileIO: Sendable {
/// has not been modified since last served. This method will also set the `"Content-Type"` header
/// automatically if an appropriate `MediaType` can be found for the file's suffix.
///
/// router.get("file-stream") { req in
/// app.get("file-stream") { req in
/// return req.fileio.streamFile(at: "/path/to/file.txt")
/// }
///
Expand Down Expand Up @@ -383,6 +384,19 @@ public struct FileIO: Sendable {
}
}

/// Async version of `read(path:fromOffset:byteCount:chunkSize:onRead)`
private func read(
path: String,
fromOffset offset: Int64,
byteCount: Int
) async throws -> ByteBuffer {
let fd = try NIOFileHandle(path: path)
defer {
try? fd.close()
}
return try await self.io.read(fileHandle: fd, fromOffset: offset, byteCount: byteCount, allocator: allocator)
}

/// Write the contents of buffer to a file at the supplied path.
///
/// let data = ByteBuffer(string: "ByteBuffer")
Expand Down Expand Up @@ -422,11 +436,101 @@ public struct FileIO: Sendable {

// update hash in dictionary
request.application.storage[FileMiddleware.ETagHashes.self]?[path] = FileMiddleware.ETagHashes.FileHash(lastModified: lastModified, digestHex: digest.hex)

return digest.hex
}
}
}

// MARK: - Concurrency
/// Reads the contents of a file at the supplied path.
///
/// let data = try await req.fileio.collectFile(file: "/path/to/file.txt")
/// print(data) // file data
///
/// - parameters:
/// - path: Path to file on the disk.
/// - returns: `ByteBuffer` containing the file data.
public func collectFile(at path: String) async throws -> ByteBuffer {
guard let fileSize = try await FileSystem.shared.info(forFileAt: .init(path))?.size else {
throw Abort(.internalServerError)
}
return try await self.read(path: path, fromOffset: 0, byteCount: Int(fileSize))
}

/// Wrapper around `NIOFileSystem.FileChunks`.
/// This can be removed once `NIOFileSystem` reaches a stable API.
public struct FileChunks: AsyncSequence {
public typealias Element = ByteBuffer
private let fileHandle: NIOFileSystem.FileHandleProtocol
private let fileChunks: NIOFileSystem.FileChunks

init(fileChunks: NIOFileSystem.FileChunks, fileHandle: some NIOFileSystem.FileHandleProtocol) {
self.fileChunks = fileChunks
self.fileHandle = fileHandle
}

public struct FileChunksIterator: AsyncIteratorProtocol {
private var iterator: NIOFileSystem.FileChunks.AsyncIterator
private let fileHandle: NIOFileSystem.FileHandleProtocol

fileprivate init(wrapping iterator: NIOFileSystem.FileChunks.AsyncIterator, fileHandle: some NIOFileSystem.FileHandleProtocol) {
self.iterator = iterator
self.fileHandle = fileHandle
}

public mutating func next() async throws -> ByteBuffer? {
let chunk = try await iterator.next()
if chunk == nil {
try await fileHandle.close()
}
return chunk
}
}

public func makeAsyncIterator() -> FileChunksIterator {
FileChunksIterator(wrapping: fileChunks.makeAsyncIterator(), fileHandle: fileHandle)
}
}

/// Reads the contents of a file at the supplied path in chunks.
///
/// for chunk in try await req.fileio.readFile(at: "/path/to/file.txt") {
/// print("chunk: \(data)")
/// }
///
/// - parameters:
/// - path: Path to file on the disk.
/// - chunkSize: Maximum size for the file data chunks.
/// - returns: `FileChunks` containing the file data chunks.
public func readFile(
at path: String,
chunkSize: Int = NonBlockingFileIO.defaultChunkSize
) async throws -> FileChunks {
let filePath = FilePath(path)

let readHandle = try await fileSystem.openFile(forReadingAt: filePath)
let chunks = readHandle.readChunks(chunkLength: .bytes(Int64(chunkSize)))

return FileChunks(fileChunks: chunks, fileHandle: readHandle)
}

/// Write the contents of buffer to a file at the supplied path.
///
/// let data = ByteBuffer(string: "ByteBuffer")
/// try await req.fileio.writeFile(data, at: "/path/to/file.txt")
///
/// - parameters:
/// - path: Path to file on the disk.
/// - buffer: The `ByteBuffer` to write.
/// - returns: `Void` when the file write is finished.
public func writeFile(_ buffer: ByteBuffer, at path: String) async throws {
let fd = try NIOFileHandle(path: path, mode: .write, flags: .allowFileCreation())
defer {
try? fd.close()
}
try await self.io.write(fileHandle: fd, buffer: buffer)
}
}

extension HTTPHeaders.Range.Value {
Expand Down
35 changes: 35 additions & 0 deletions Tests/VaporTests/FileTests.swift
Expand Up @@ -537,4 +537,39 @@ final class FileTests: XCTestCase {
XCTAssertEqual(res.status, .badRequest)
}
}

func testAsyncFileWrite() async throws {
let app = Application(.testing)
defer { app.shutdown() }

let request = Request(application: app, on: app.eventLoopGroup.next())

let data = "Hello"
let path = "/tmp/fileio_write.txt"

try await request.fileio.writeFile(ByteBuffer(string: data), at: path)
defer { try? FileManager.default.removeItem(atPath: path) }

let result = try String(contentsOfFile: path)
XCTAssertEqual(result, data)
}

func testAsyncFileRead() async throws {
let app = Application(.testing)
defer { app.shutdown() }

let request = Request(application: app, on: app.eventLoopGroup.next())

let path = "/" + #filePath.split(separator: "/").dropLast().joined(separator: "/") + "/Utilities/long-test-file.txt"

let content = try String(contentsOfFile: path)

var readContent = ""
let file = try await request.fileio.readFile(at: path, chunkSize: 16 * 1024) // 32Kb, ~5 chunks
for try await chunk in file {
readContent += String(buffer: chunk)
}

XCTAssertEqual(readContent, content, "The content read from the file does not match the expected content.")
}
}

0 comments on commit 4c80aab

Please sign in to comment.