Skip to content

Commit

Permalink
Revamped failure models (#228)
Browse files Browse the repository at this point in the history
  • Loading branch information
DanteNiewenhuis committed May 7, 2024
1 parent 7c0691e commit ad20465
Show file tree
Hide file tree
Showing 68 changed files with 2,853 additions and 488 deletions.
3 changes: 3 additions & 0 deletions opendc-compute/opendc-compute-api/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@ description = "API interface for the OpenDC Compute service"
plugins {
`kotlin-library-conventions`
}
dependencies {
implementation(project(mapOf("path" to ":opendc-simulator:opendc-simulator-compute")))
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

package org.opendc.compute.api

import org.opendc.simulator.compute.workload.SimWorkload
import java.util.UUID

/**
Expand Down Expand Up @@ -113,6 +114,11 @@ public interface ComputeClient : AutoCloseable {
start: Boolean = true,
): Server

public fun rescheduleServer(
server: Server,
workload: SimWorkload,
)

/**
* Release the resources associated with this client, preventing any further API calls.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@

package org.opendc.compute.carbon

import mu.KotlinLogging
import org.opendc.trace.Trace
import org.opendc.trace.conv.CARBON_INTENSITY_TIMESTAMP
import org.opendc.trace.conv.CARBON_INTENSITY_VALUE
import org.opendc.trace.conv.TABLE_CARBON_INTENSITY
import org.opendc.trace.conv.TABLE_CARBON_INTENSITIES
import java.io.File
import java.lang.ref.SoftReference
import java.time.Instant
Expand All @@ -38,11 +37,6 @@ import java.util.concurrent.ConcurrentHashMap
* @param baseDir The directory containing the traces.
*/
public class CarbonTraceLoader {
/**
* The logger for this instance.
*/
private val logger = KotlinLogging.logger {}

/**
* The cache of workloads.
*/
Expand All @@ -54,13 +48,11 @@ public class CarbonTraceLoader {
* Read the metadata into a workload.
*/
private fun parseCarbon(trace: Trace): List<CarbonFragment> {
val reader = checkNotNull(trace.getTable(TABLE_CARBON_INTENSITY)).newReader()
val reader = checkNotNull(trace.getTable(TABLE_CARBON_INTENSITIES)).newReader()

val startTimeCol = reader.resolve(CARBON_INTENSITY_TIMESTAMP)
val carbonIntensityCol = reader.resolve(CARBON_INTENSITY_VALUE)

val entries = mutableListOf<CarbonFragment>()

try {
while (reader.nextRow()) {
val startTime = reader.getInstant(startTimeCol)!!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,21 @@
* SOFTWARE.
*/

package org.opendc.compute.simulator.failure
description = "OpenDC Failure Service implementation"

import org.opendc.compute.service.ComputeService
import java.time.InstantSource
import java.util.random.RandomGenerator
import kotlin.coroutines.CoroutineContext
// Build configuration
plugins {
`kotlin-library-conventions`
}

/**
* Factory interface for constructing [HostFaultInjector] for modeling failures of compute service hosts.
*/
public interface FailureModel {
/**
* Construct a [HostFaultInjector] for the specified [service].
*/
public fun createInjector(
context: CoroutineContext,
clock: InstantSource,
service: ComputeService,
random: RandomGenerator,
): HostFaultInjector
dependencies {
api(projects.opendcCompute.opendcComputeApi)
implementation(projects.opendcCommon)
implementation(project(mapOf("path" to ":opendc-trace:opendc-trace-api")))
implementation(project(mapOf("path" to ":opendc-simulator:opendc-simulator-compute")))
implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-service")))
implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-simulator")))

api(libs.commons.math3)
implementation(libs.kotlin.logging)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,22 @@
* SOFTWARE.
*/

package org.opendc.compute.simulator.failure
package org.opendc.compute.failure.hostfault

import org.opendc.compute.service.ComputeService
import org.opendc.compute.simulator.SimHost
import java.time.InstantSource

/**
* Interface responsible for applying the fault to a host.
*/
public interface HostFault {
public abstract class HostFault(
private val service: ComputeService,
) {
/**
* Apply the fault to the specified [victims].
*/
public suspend fun apply(
clock: InstantSource,
public abstract suspend fun apply(
victims: List<SimHost>,
faultDuration: Long,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,39 +20,43 @@
* SOFTWARE.
*/

package org.opendc.compute.simulator.failure
package org.opendc.compute.failure.hostfault

import kotlinx.coroutines.delay
import org.apache.commons.math3.distribution.RealDistribution
import org.opendc.compute.api.ComputeClient
import org.opendc.compute.service.ComputeService
import org.opendc.compute.simulator.SimHost
import java.time.InstantSource
import kotlin.math.roundToLong
import org.opendc.simulator.compute.workload.SimWorkload

/**
* A type of [HostFault] where the hosts are stopped and recover after some random amount of time.
* A type of [HostFault] where the hosts are stopped and recover after a given amount of time.
*/
public class StartStopHostFault(private val duration: RealDistribution) : HostFault {
public class StartStopHostFault(
private val service: ComputeService,
) : HostFault(service) {
override suspend fun apply(
clock: InstantSource,
victims: List<SimHost>,
faultDuration: Long,
) {
val client: ComputeClient = service.newClient()

for (host in victims) {
host.fail()
}
val servers = host.instances

val df = (duration.sample() * 1000).roundToLong() // seconds to milliseconds
val snapshots = servers.map { (it.meta["workload"] as SimWorkload).snapshot() }
host.fail()

// Handle long overflow
if (clock.millis() + df <= 0) {
return
for ((server, snapshot) in servers.zip(snapshots)) {
client.rescheduleServer(server, snapshot)
}
}

delay(df)
delay(faultDuration)

for (host in victims) {
host.recover()
}
}

override fun toString(): String = "StartStopHostFault[$duration]"
override fun toString(): String = "StartStopHostFault"
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,44 +20,39 @@
* SOFTWARE.
*/

package org.opendc.compute.simulator.internal
package org.opendc.compute.failure.models

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import org.apache.commons.math3.distribution.RealDistribution
import org.opendc.compute.failure.hostfault.HostFault
import org.opendc.compute.failure.hostfault.StartStopHostFault
import org.opendc.compute.failure.victimselector.StochasticVictimSelector
import org.opendc.compute.service.ComputeService
import org.opendc.compute.simulator.SimHost
import org.opendc.compute.simulator.failure.HostFault
import org.opendc.compute.simulator.failure.HostFaultInjector
import org.opendc.compute.simulator.failure.VictimSelector
import java.time.InstantSource
import java.util.random.RandomGenerator
import kotlin.coroutines.CoroutineContext
import kotlin.math.roundToLong

/**
* Internal implementation of the [HostFaultInjector] interface.
*
* @param context The scope to run the fault injector in.
* @param clock The [InstantSource] to keep track of simulation time.
* @param hosts The set of hosts to inject faults into.
* @param iat The inter-arrival time distribution of the failures (in hours).
* @param selector The [VictimSelector] to select the host victims.
* @param fault The type of [HostFault] to inject.
* Factory interface for constructing [FailureModel] for modeling failures of compute service hosts.
*/
internal class HostFaultInjectorImpl(
private val context: CoroutineContext,
private val clock: InstantSource,
private val hosts: Set<SimHost>,
private val iat: RealDistribution,
private val selector: VictimSelector,
private val fault: HostFault,
) : HostFaultInjector {
/**
* The scope in which the injector runs.
*/
private val scope = CoroutineScope(context + Job())
public abstract class FailureModel(
context: CoroutineContext,
protected val clock: InstantSource,
protected val service: ComputeService,
protected val random: RandomGenerator,
) : AutoCloseable {
protected val scope: CoroutineScope = CoroutineScope(context + Job())

// TODO: could at some point be extended to different types of faults
protected val fault: HostFault = StartStopHostFault(service)

// TODO: could at some point be extended to different types of victim selectors
protected val victimSelector: StochasticVictimSelector = StochasticVictimSelector(random)

protected val hosts: Set<SimHost> = service.hosts.map { it as SimHost }.toSet()

/**
* The [Job] that awaits the nearest fault in the system.
Expand All @@ -67,7 +62,7 @@ internal class HostFaultInjectorImpl(
/**
* Start the fault injection into the system.
*/
override fun start() {
public fun start() {
if (job != null) {
return
}
Expand All @@ -79,25 +74,7 @@ internal class HostFaultInjectorImpl(
}
}

/**
* Converge the injection process.
*/
private suspend fun runInjector() {
while (true) {
// Make sure to convert delay from hours to milliseconds
val d = (iat.sample() * 3.6e6).roundToLong()

// Handle long overflow
if (clock.millis() + d <= 0) {
return
}

delay(d)

val victims = selector.select(hosts)
fault.apply(clock, victims)
}
}
public abstract suspend fun runInjector()

/**
* Stop the fault injector.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright (c) 2024 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package org.opendc.compute.failure.models

import kotlinx.coroutines.delay
import org.apache.commons.math3.distribution.RealDistribution
import org.opendc.compute.service.ComputeService
import java.time.InstantSource
import java.util.random.RandomGenerator
import kotlin.coroutines.CoroutineContext
import kotlin.math.max
import kotlin.math.min
import kotlin.math.roundToLong

/**
* Sample based failure model
*
* @property context
* @property clock
* @property service
* @property random
* @property iatSampler A distribution from which the time until the next fault is sampled in ms
* @property durationSampler A distribution from which the duration of a fault is sampled in s
* @property nohSampler A distribution from which the number of hosts that fault is sampled.
*/
public class SampleBasedFailureModel(
context: CoroutineContext,
clock: InstantSource,
service: ComputeService,
random: RandomGenerator,
private val iatSampler: RealDistribution,
private val durationSampler: RealDistribution,
private val nohSampler: RealDistribution,
) : FailureModel(context, clock, service, random) {
override suspend fun runInjector() {
while (true) {
val iatSample = max(0.0, iatSampler.sample())
val intervalDuration = (iatSample * 3.6e6).roundToLong()

// Handle long overflow
if (clock.millis() + intervalDuration <= 0) {
return
}

delay(intervalDuration)

val numberOfHosts = min(1.0, max(0.0, nohSampler.sample()))
val victims = victimSelector.select(hosts, numberOfHosts)

val durationSample = max(0.0, durationSampler.sample())
val faultDuration = (durationSample * 3.6e6).toLong()
fault.apply(victims, faultDuration)

break
}
}
}

0 comments on commit ad20465

Please sign in to comment.