Skip to content

Releases: lampepfl/gears

v0.2.0

15 May 17:40
a4a6b72
Compare
Choose a tag to compare

Welcome to a new release of Gears!

Highlights

  • We now ship artifacts for both Scala JVM and Scala Native! Adding Gears to your project is now as simple as using dep "ch.epfl.lamp::gears::0.2.0"!
  • Alongside the shipped artifacts, gears now no longer require a snapshot/locally published version of scala-native. We use 0.5.1 directly.
  • A lot of the internal designs were brought up-to-date with the Gears book, including (but not limited to):
    • Previously, Async was the only requirement to create Futures. Now, we require a super-capability of Async, named Async.Spawn. This can be acquired by Async.group (or directly from the Async contexts of Async.blocking and Future.apply). Writing functions that spawns Futures are slightly more involving:
      def createFuture(using Async) = 
        Async.group:
          val fut = Future(...)
      This guarantees that futures do not escape the function scope, and hence don't become "dangling" once the function returns. See #46 and the section on Async.Spawn of the book for more details.
    • Listener locks no longer need to be nested. This should simplify the requirements for implementing custom Listeners, as well as improve the performance of races (especially nested races).
    • alt is generally renamed to awaitFirst in cases, to be consistent with awaitAll.
  • We now host a scaladoc-generated API reference. This is currently tracking main, but will soon be updated to track the latest stable version as well.
  • We are now officially on the Apache 2 license!

Other changes

New Contributors

Full Changelog: v0.1.0...v0.2.0

v0.2.0-RC3

14 May 08:50
1c63af7
Compare
Choose a tag to compare
v0.2.0-RC3 Pre-release
Pre-release

This is a test release to check sbt-ci-release workflow. Final v0.2.0 is coming soon.

v0.2.0-RC2

14 May 08:44
37d17e7
Compare
Choose a tag to compare
v0.2.0-RC2 Pre-release
Pre-release

This is a test release to check sbt-ci-release workflow. Final v0.2.0 is coming soon.

v0.2.0-RC1

14 May 08:20
34668c7
Compare
Choose a tag to compare
v0.2.0-RC1 Pre-release
Pre-release

This is a test release to check sbt-ci-release workflow. Final v0.2.0 is coming soon.

First experimental release

10 Dec 20:57
793cf70
Compare
Choose a tag to compare

Welcome to the first release of gears, the experimental cross-platform asynchronous programming library for Scala 3!

In this first release, we introduce concepts of asynchronous programming, both low-level/unstructured (sources, listeners, channels) and concepts for high-level structured concurrency. These are the first iteration of a complete design, and might have some holes in usability and performance. Feedback is appreciated!

A tutorial about the basic concepts will be available soon.

What is included?

User-facing, high level asynchronous programming

  • gears.async introduces the concept of an Async context/capability: functions that implicitly take an Async context (using Async) are suspendable computations and are capable of both performing asynchronous operations and spawning more concurrent asynchronous computations.
    This is the recommended way to write asynchronous code: Functions that explicitly need to suspend or spawn concurrent computations should take an Async context:
    def performAsyncIO(using Async): Int = ???
    However, higher-order functions that do not explicitly use these capabilities do not have to take an Async context.
    val result: Seq[Int] = (1 to 5).map(_ => performAsyncIO)
                                 // ^^^ map is the regular Seq.map implementation!
  • Spawning concurrent computations: Computations that are run concurrent with the caller can be spawned by invoking Future.apply with a body:
    val resultFuts: Seq[Future[Int]] = (1 to 5).map(_ => Future(performAsyncIO))
    Different from the previous example, this creates 5 concurrently (possibly parallelly) running computations, with a Future type that you can await for. awaiting effectively suspends the current computation until the result of the awaited computation is available.
    val resultsTry: List[Try[Int]] = resultFuts.map(_.awaitResult)
    This awaits for the results in order. Note that .awaitResult requires an Async context. It returns a Try[Int], since the computation inside the Future may throw or be interrupted. To bypass this and directly get the result (rethrowing on Failure), use .await.
    val results: List[Int] = resultFuts.map(_.await)
  • Working with Futures: some library functions are provided to make working with futures more convenient:
    • Seq[Future[T]].awaitAll waits for all futures to complete and return Seq[T] as a result, throwing when the first failure appears.
    • Seq[Future[T]].altAll waits for the first future to succeed, returning its value. If all fails, return the last failure.
      Both variants provide a WithCancel alternative for owned futures, where if the wait is short-circuited, then other futures are optimistically cancelled.
    • Async.select allows you to race the futures, and continue the computation based on which value was received:
    val fFut = Future(f())
    val gFut = Future(g())
    val results = Async.select(
      fFut.andThen: x =>
        ???, // handle f result ...
      gFut.andThen: y =>
        ??? // handle g result ...
    )
  • Structured Concurrency: Every Async context carries a completion group, tracking all concurrently running cancellable computations in a tree-like structure (groups can contain other groups). A group can be manually spawned by Async.group, which is automatically linked to the group present in the current Async context:
    val compute = Async.group:
      val f1 = Future(f())
      val g1 = Future(g())
      Seq(f1, g1).altAll
    Upon the return of the main body, all running concurrent computations are cancelled (by calling .cancel() and awaited for. This guarantees that once outside Async.group, no concurrent computations inside that group can still be running, and we maintain the tree-like structure.
    Future.apply automatically creates a new group for its running body, so when you decide to compute something in parallel, you completely control its lifetime, including all of the concurrent computations that it spawned!
    When the main body of the group completes, all unneeded/unawaited running computations are cancelled and awaited for clean up.
  • Going in-and-out of Async: Async.blocking creates an Async context out of thin air (given a suspension implementation and a scheduler)! This Async context blocks the running thread for suspension, which is typically not what you want. However, it is useful in two scenarios:
    • As the root Async context: you would put this under the main function of the application.
    • As a truly blocking call to an asynchrous operation: not recommended, but this works similar to Node.js ...Sync variants,
      where the thread is blocked until the operation completes, returning the results directly.
  • Cross Platform: gears is implemented generically, only assuming a suspension interface and a scheduler to work (see AsyncSupport). However, two default implementations are provided:
    • Using JVM >=21's virtual threads, alongside the usual JVM virtual thread scheduler
    • Using Scala Native 0.5 delimited continuations, with the ForkJoinPool scheduler
      Both can be provided by importing gears.async.default.given, which automatically selects the correct implementation for the platform.

Mid-level, unstructured asynchronous operations

  • Sources are the abstractions of asynchronous resources that an Async context can .awaitResult for. From the high-level interface, we have been using Future[T], which is actually a Source[Try[T]] that has a special property: once completed, Future[T] always return the same result.
  • Listeners are the primary out-of-band way to receive values from a Source.
    The most simple implementation of a listener can be done by Listener.apply, which takes the item T and the origin Source[T] and do something with it! Listeners always receive at most one item from the Source, and are removed from the Source's list once completed.
    Listener bodies are run on the same computation that resolves the sources, so usage of them needs to be carefully tuned for a high degree of concurrency performance.
    • Locking listeners adds synchronization capabilities to a listener, to allow it to listen to multiple sources and decide whether it would be open to accepting the item or it is already expired. To know more, check out the Listener interface.
  • Channels are bidirectional channels that can be use as a communication mean between concurrent processes. They provide .send(x: T)(using Async) and .read()(using Async) as suspending methods, but also .readSource and .sendSource(x: T) sources that reads/sends an item when a listener is attached and accepts the item/event.
    Three types of channels are provided:
    • SyncChannels are unbuffered synchronous (rendezvous) channels where sends and reads block until the transfer is actually made.
    • BufferedChannels allow sends to be buffered up to a certain buffer count, completing instantly if buffer space is available.
    • UnboundedChannels allow sends to be always buffered with a growable buffer, returning instantly in all cases. It exposes a .sendImmediately method that allows sending without an Async context.
  • race, select and channel synchronization: gears provide exclusivity when using the above Async.select method with sources: exactly one of the given sources given will be resolved by the select (and no other item/event will be consumed from other sources). This is especially important for channels, as you typically would not want items from a channel being raced and thrown away:
    Async.blocking:
      val c1 = UnboundedChannel[Int]()
      val c2 = UnboundedChannel[Int]()
      c1.sendImmediately(1)
      c2.sendImmediately(2)
    
      def getInt = Async.select(
        c1.readSource.andThen(_ + 1),
        c2.readSource.andThen(_ + 2),
      )
    
      val ints = Seq(getInt, getInt) // Seq(1, 2) or Seq(2, 1)
    race has a similar exclusivity semantics to select, but expects a list of Source[T] as parameters and return a Source[T], resolving with an item when one of the inputs produces one. This allows you to compose Sources by racing them losslessly:
    val c1 = UnboundedChannel[Int]()
    val c2 = UnboundedChannel[Int]()
    val c3 = UnboundedChannel[Int]()
    c1.sendImmediately(1)
    c2.sendImmediately(2)
    
    val cs12: Source[Int] = Async.race(c1.readSource, c2.readSource)
    val cs123: Source[Int] = Async.race(cs12, c3.readSource)
    
    val ints = Async.blocking { Seq(cs12.awaitResult, cs123.awaitResult) } // Seq(1, 2) or Seq(2, 1)
  • Creating sources: Source is a trait that can be manually implemented, but some Source creation methods are available:
    • With Promise and Future.withResolver: Promise is a wrapper for a Future that gets externally resolved. It exposes a .complete method which can be called externally to resolve the future and continue all awaiting computations.
      Future.withResolver exposes the same mechanics, but in a more familiar interface to Node.js's Promise constructor, while also allowing the caller to register a method that cleans up the external process when the Future is cancelled.
    • With Source.values: creates a very simple queue-like source that resolves items to listeners as they come.
    • Source transformation API: Source[T] exposes .transformValuesWith which applies a transformation function to every item in the original source before passing on a listener. This can be used to transform sources, but is not recommended, as the transformation will be run on the same computation as the original source and might cause unexpected performance problems.

Low-level susp...

Read more