Skip to content

Commit

Permalink
Reworked Scenario.kt to consist of only specifications. The Specs are…
Browse files Browse the repository at this point in the history
… turned into objects when the scenario is being executed by ScenarioRunner.kt (#227)
  • Loading branch information
DanteNiewenhuis committed Apr 29, 2024
1 parent 2dc44c7 commit 1b8e813
Show file tree
Hide file tree
Showing 18 changed files with 89 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@

package org.opendc.compute.topology

import org.opendc.compute.topology.specs.ClusterJSONSpec
import org.opendc.compute.topology.specs.ClusterSpec
import org.opendc.compute.topology.specs.HostJSONSpec
import org.opendc.compute.topology.specs.HostSpec
import org.opendc.compute.topology.specs.TopologyJSONSpec
import org.opendc.compute.topology.specs.TopologySpec
import org.opendc.simulator.compute.SimPsuFactories
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
Expand All @@ -45,6 +45,16 @@ import java.util.random.RandomGenerator
*/
private val reader = TopologyReader()

/**
* Construct a topology from the specified [pathToFile].
*/
public fun clusterTopology(
pathToFile: String,
random: RandomGenerator = SplittableRandom(0),
): List<HostSpec> {
return clusterTopology(File(pathToFile), random)
}

/**
* Construct a topology from the specified [file].
*/
Expand All @@ -68,9 +78,9 @@ public fun clusterTopology(
}

/**
* Helper method to convert a [TopologyJSONSpec] into a list of [HostSpec]s.
* Helper method to convert a [TopologySpec] into a list of [HostSpec]s.
*/
private fun TopologyJSONSpec.toHostSpecs(random: RandomGenerator): List<HostSpec> {
private fun TopologySpec.toHostSpecs(random: RandomGenerator): List<HostSpec> {
return clusters.flatMap { cluster ->
List(cluster.count) {
cluster.toHostSpecs(random)
Expand All @@ -79,11 +89,11 @@ private fun TopologyJSONSpec.toHostSpecs(random: RandomGenerator): List<HostSpec
}

/**
* Helper method to convert a [ClusterJSONSpec] into a list of [HostSpec]s.
* Helper method to convert a [ClusterSpec] into a list of [HostSpec]s.
*/
private var clusterId = 0

private fun ClusterJSONSpec.toHostSpecs(random: RandomGenerator): List<HostSpec> {
private fun ClusterSpec.toHostSpecs(random: RandomGenerator): List<HostSpec> {
val hostSpecs =
hosts.flatMap { host ->
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ package org.opendc.compute.topology
import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.decodeFromStream
import org.opendc.compute.topology.specs.TopologyJSONSpec
import org.opendc.compute.topology.specs.TopologySpec
import java.io.File
import java.io.InputStream

Expand All @@ -34,9 +34,9 @@ import java.io.InputStream
*/
public class TopologyReader {
@OptIn(ExperimentalSerializationApi::class)
public fun read(file: File): TopologyJSONSpec {
public fun read(file: File): TopologySpec {
val input = file.inputStream()
val obj = Json.decodeFromStream<TopologyJSONSpec>(input)
val obj = Json.decodeFromStream<TopologySpec>(input)

return obj
}
Expand All @@ -45,8 +45,8 @@ public class TopologyReader {
* Read the specified [input].
*/
@OptIn(ExperimentalSerializationApi::class)
public fun read(input: InputStream): TopologyJSONSpec {
val obj = Json.decodeFromStream<TopologyJSONSpec>(input)
public fun read(input: InputStream): TopologySpec {
val obj = Json.decodeFromStream<TopologySpec>(input)
return obj
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import kotlinx.serialization.Serializable
* @param clusters List of the clusters in this topology
*/
@Serializable
public data class TopologyJSONSpec(
val clusters: List<ClusterJSONSpec>,
public data class TopologySpec(
val clusters: List<ClusterSpec>,
val schemaVersion: Int = 1,
)

Expand All @@ -43,7 +43,7 @@ public data class TopologyJSONSpec(
* @param location Location of the cluster. This can impact the carbon intensity
*/
@Serializable
public data class ClusterJSONSpec(
public data class ClusterSpec(
val name: String = "Cluster",
val count: Int = 1,
val hosts: List<HostJSONSpec>,
Expand All @@ -62,9 +62,9 @@ public data class ClusterJSONSpec(
@Serializable
public data class HostJSONSpec(
val name: String = "Host",
val cpu: CPUJSONSpec,
val memory: MemoryJSONSpec,
val powerModel: PowerModelJSONSpec = PowerModelJSONSpec("linear", 350.0, 400.0, 200.0),
val cpu: CPUSpec,
val memory: MemorySpec,
val powerModel: PowerModelSpec = PowerModelSpec("linear", 350.0, 400.0, 200.0),
val count: Int = 1,
)

Expand All @@ -78,7 +78,7 @@ public data class HostJSONSpec(
* @param coreSpeed The speed of the cores in Mhz
*/
@Serializable
public data class CPUJSONSpec(
public data class CPUSpec(
val vendor: String = "unknown",
val modelName: String = "unknown",
val arch: String = "unknown",
Expand All @@ -97,7 +97,7 @@ public data class CPUJSONSpec(
* @param memorySize The size of the memory Unit in MiB
*/
@Serializable
public data class MemoryJSONSpec(
public data class MemorySpec(
val vendor: String = "unknown",
val modelName: String = "unknown",
val arch: String = "unknown",
Expand All @@ -106,7 +106,7 @@ public data class MemoryJSONSpec(
)

@Serializable
public data class PowerModelJSONSpec(
public data class PowerModelSpec(
val modelType: String,
val power: Double = 400.0,
val maxPower: Double,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package org.opendc.experiments.base.runner

import FailureModelSpec
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
Expand All @@ -33,11 +34,8 @@ import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.api.ServerWatcher
import org.opendc.compute.service.ComputeService
import org.opendc.compute.simulator.failure.FailureModel
import org.opendc.compute.workload.VirtualMachine
import java.time.InstantSource
import java.util.Random
import kotlin.coroutines.coroutineContext
import kotlin.math.max

/**
Expand Down Expand Up @@ -76,25 +74,24 @@ public class RunningServerWatcher : ServerWatcher {
* @param trace The trace to simulate.
* @param seed The seed to use for randomness.
* @param submitImmediately A flag to indicate that the servers are scheduled immediately (so not at their start time).
* @param failureModel A failure model to use for injecting failures.
* @param failureModelSpec A failure model to use for injecting failures.
*/
public suspend fun ComputeService.replay(
clock: InstantSource,
trace: List<VirtualMachine>,
seed: Long,
failureModelSpec: FailureModelSpec? = null,
seed: Long = 0,
submitImmediately: Boolean = false,
failureModel: FailureModel? = null,
) {
val injector = failureModel?.createInjector(coroutineContext, clock, this, Random(seed))
// TODO: add failureModel functionality
val client = newClient()

// Create new image for the virtual machine
val image = client.newImage("vm-image")

try {
coroutineScope {
// Start the fault injector
injector?.start()
// TODO: start failure model when implemented

var simulationOffset = Long.MIN_VALUE

Expand All @@ -107,9 +104,6 @@ public suspend fun ComputeService.replay(
simulationOffset = start - now
}

// Make sure the trace entries are ordered by submission time
// assert(start - simulationOffset >= 0) { "Invalid trace order" }

// Delay the server based on the startTime given by the trace.
if (!submitImmediately) {
delay(max(0, (start - now - simulationOffset)))
Expand Down Expand Up @@ -146,7 +140,7 @@ public suspend fun ComputeService.replay(
}
yield()
} finally {
injector?.close()
// TODO: close failure model when implemented
client.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ import org.opendc.compute.simulator.provisioner.registerComputeMonitor
import org.opendc.compute.simulator.provisioner.setupComputeService
import org.opendc.compute.simulator.provisioner.setupHosts
import org.opendc.compute.telemetry.export.parquet.ParquetComputeMonitor
import org.opendc.compute.topology.clusterTopology
import org.opendc.compute.workload.ComputeWorkloadLoader
import org.opendc.experiments.base.models.scenario.Scenario
import org.opendc.experiments.base.scenario.Scenario
import org.opendc.simulator.kotlin.runSimulation
import java.io.File
import java.time.Duration
Expand All @@ -47,10 +48,10 @@ import java.util.stream.LongStream
/**
* Run scenario when no pool is available for parallel execution
*
* @param scenario The scenario to run
* @param scenarios The scenarios to run
* @param parallelism The number of scenarios that can be run in parallel
*/
public fun runScenario(
public fun runScenarios(
scenarios: List<Scenario>,
parallelism: Int,
) {
Expand Down Expand Up @@ -110,12 +111,14 @@ public fun runScenario(
runSimulation {
val serviceDomain = "compute.opendc.org"
Provisioner(dispatcher, seed).use { provisioner ->

val topology = clusterTopology(scenario.topology.pathToFile, Random(seed))
provisioner.runSteps(
setupComputeService(
serviceDomain,
{ createComputeScheduler(ComputeSchedulerEnum.Mem, Random(it.seeder.nextLong())) },
),
setupHosts(serviceDomain, scenario.topology, optimize = true),
setupHosts(serviceDomain, topology, optimize = true),
)

val workloadLoader = ComputeWorkloadLoader(File(scenario.workload.pathToFile))
Expand All @@ -126,42 +129,10 @@ public fun runScenario(
saveInOutputFolder(provisioner, serviceDomain, scenario, seed, startTime, carbonTrace)

val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!!
service.replay(timeSource, vms, seed, failureModel = scenario.failureModel)
service.replay(timeSource, vms, failureModelSpec = scenario.failureModel, seed = seed)
}
}

/**
* When the simulation is run, saves the simulation results into a seed folder. This is useful for debugging purposes.
* @param provisioner The provisioner used to setup and run the simulation.
* @param serviceDomain The domain of the compute service.
* @param scenario The scenario being run in the simulation.
* @param seed The seed used for randomness in the simulation.
* @param partition The partition name for the output data.
* @param startTime The start time of the simulation.
*/
public fun saveInSeedFolder(
provisioner: Provisioner,
serviceDomain: String,
scenario: Scenario,
seed: Long,
partition: String,
startTime: Duration,
) {
provisioner.runStep(
registerComputeMonitor(
serviceDomain,
ParquetComputeMonitor(
File(scenario.outputFolder),
partition,
bufferSize = 4096,
),
Duration.ofSeconds(scenario.exportModel.exportInterval),
startTime,
),
)
}

/**
* Saves the simulation results into a specific output folder received from the input.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
* SOFTWARE.
*/

package org.opendc.experiments.base.models.scenario
package org.opendc.experiments.base.scenario

import AllocationPolicySpec
import ExportModelSpec
import FailureModelSpec
import ScenarioTopologySpec
import WorkloadSpec
import org.opendc.compute.simulator.failure.FailureModel
import org.opendc.compute.topology.specs.HostSpec

/**
* A data class representing a scenario for a set of experiments.
Expand All @@ -42,10 +42,10 @@ import org.opendc.compute.topology.specs.HostSpec
* @property initialSeed The Int representing the initial seed of the scenario. It defaults to 0.
*/
public data class Scenario(
val topology: List<HostSpec>,
val topology: ScenarioTopologySpec,
val workload: WorkloadSpec,
val allocationPolicy: AllocationPolicySpec,
val failureModel: FailureModel?,
val failureModel: FailureModelSpec?,
val carbonTracePath: String? = null,
val exportModel: ExportModelSpec = ExportModelSpec(),
val outputFolder: String = "output",
Expand Down

0 comments on commit 1b8e813

Please sign in to comment.