Skip to content

Commit

Permalink
Issue 259 Add executor framework for running viewports in multiple pa…
Browse files Browse the repository at this point in the history
…rallel threads (#338)

* #259 Added executor framework for running multiple viewport calculations on separate thread pool.

* #259 left default thread count as one in SimulMain
  • Loading branch information
chrisjstevo committed Nov 18, 2022
1 parent 9ade0d1 commit bba484a
Show file tree
Hide file tree
Showing 12 changed files with 398 additions and 73 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Expand Up @@ -60,8 +60,8 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>16</maven.compiler.source>
<maven.compiler.target>16</maven.compiler.target>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<scala.version>2.13.10</scala.version>
<metrics.version>4.2.5</metrics.version>
<curator.version>3.1.0</curator.version>
Expand Down
Expand Up @@ -2,13 +2,13 @@ package org.finos.toolbox.logging

import org.finos.toolbox.time.Clock

class LogAtFrequency(millis: Long)(implicit val timeProvider: Clock) {
class LogAtFrequency(millis: Long)(implicit val clock: Clock) {

@volatile private var lastLog: Long = -1

def shouldLog(): Boolean = {

val now = timeProvider.now()
val now = clock.now()

val diff = now - lastLog

Expand Down
@@ -0,0 +1,114 @@
package org.finos.toolbox.thread

import com.typesafe.scalalogging.StrictLogging
import org.finos.toolbox.lifecycle.{LifecycleContainer, LifecycleEnabled}
import org.finos.toolbox.logging.LogAtFrequency
import org.finos.toolbox.thread.executor.ResubmitExecutor
import org.finos.toolbox.time.Clock

import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{Callable, ConcurrentSkipListSet, FutureTask, LinkedBlockingQueue, TimeUnit}
import scala.jdk.CollectionConverters.CollectionHasAsScala

trait WorkItem[T] extends Comparable[WorkItem[T]]{
def doWork(): T
def compareTo(o: WorkItem[T]): Int = {
if(o.hashCode() == this.hashCode()){
0
}else if( o.hashCode() < this.hashCode()){
1
}else if(o.hashCode() > this.hashCode()){
-1
}else{
0
}
}
}

abstract class LifeCycleRunOncePerThreadExecutorRunner[T](val name: String, val countOfThreads: Int, val generateWorkFunc: () => List[WorkItem[T]]) (implicit lifecycle: LifecycleContainer, clock: Clock) extends LifeCycleRunner(name, () => ()) with StrictLogging{

lifecycle(this)

private var retryExecutor: Option[ResubmitExecutor[T]] = None
private final val workQueue = new LinkedBlockingQueue[Runnable]()
private val selfRef = this;
private final val setOfWork = new ConcurrentSkipListSet[WorkItem[T]]()

override def doStart(): Unit = {
logger.info("Starting up viewport runner...")
retryExecutor = Some(new ResubmitExecutor[T](countOfThreads, countOfThreads, 1000, TimeUnit.SECONDS, workQueue){
override def newCallable(r: FutureTask[T], t: Throwable): Callable[T] = {
selfRef.newCallable(r)
}
override def shouldResubmit(r: FutureTask[T], t: Throwable): Boolean = {
setOfWork.contains(newWorkItem(r))
}
override def newWorkItem(r: FutureTask[T]): WorkItem[T] = selfRef.newWorkItem(r)
})
runInBackground()
}

override protected def getRunnable() = {
() => {

while (true) {
val start = clock.now()

val workList = generateWorkFunc()
val addedWork = workList.filter(!setOfWork.contains(_))
val removedWork = CollectionHasAsScala(setOfWork).asScala.filter(item => !workList.contains(item))

removedWork.foreach( item => {
setOfWork.remove(item)
logger.info("Removed work item from viewport threadpool:" + item)
})

addedWork.foreach(item => {
//println("Adding" + item.hashCode())
setOfWork.add(item)
})

retryExecutor match {
case Some(executor) => {
addedWork.foreach(work => {
executor.submit(new Callable[T] {
override def call(): T = {
logger.info("Adding work to vp threadpool.." + work)
work.doWork()
}
})
})
}
case None =>
}

val end = clock.now()

doMinCycleTime(start, end)
if (Thread.interrupted()) {
//shouldContinue.set(false)
logger.debug(s"[$name] interrupted or run once, going to exit")
}
}
}
}

def newCallable(r: FutureTask[T]): Callable[T]
def newWorkItem(r: FutureTask[T]): WorkItem[T]

override def doStop(): Unit = {
retryExecutor match {
case Some(executor) => {
executor.shutdown()
}
case None => //all good
}
logger.info(s"[$name] is exiting....")
}

override def doInitialize(): Unit = {}

override def doDestroy(): Unit = {}

override val lifecycleId: String = this.getClass.getName + "#" + this.hashCode()
}
6 changes: 3 additions & 3 deletions toolbox/src/main/scala/org/finos/toolbox/thread/Runner.scala
Expand Up @@ -9,11 +9,11 @@ import scala.util.control.NonFatal

class Runner(name: String, func: () => Unit, minCycleTime: Long = 1000, runOnce: Boolean = false)(implicit clock: Clock) extends StrictLogging {

private val thread = new NamedThreadFactory(name).newThread(getRunnable)
private val thread = new NamedThreadFactory(name).newThread(getRunnable())

private val shouldContinue = new AtomicBoolean(true)

private def doMinCycleTime(start: Long, end: Long): Unit = {
def doMinCycleTime(start: Long, end: Long): Unit = {
val takenMillis = end - start
if(takenMillis < minCycleTime){

Expand All @@ -38,7 +38,7 @@ class Runner(name: String, func: () => Unit, minCycleTime: Long = 1000, runOnce:
thread.interrupt()
}

protected def getRunnable = {
protected def getRunnable(): Runnable = {
new Runnable {
override def run(): Unit = {
try{
Expand Down
@@ -0,0 +1,56 @@
package org.finos.toolbox.thread.executor

import com.typesafe.scalalogging.StrictLogging
import org.finos.toolbox.logging.LogAtFrequency
import org.finos.toolbox.thread.WorkItem
import org.finos.toolbox.time.Clock

import java.util
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{BlockingQueue, Callable, FutureTask, RunnableFuture, ThreadPoolExecutor}
import scala.concurrent.duration.TimeUnit

/**
* This is a java executor implementation, which will automatically resubmit the existing job into the queue
* when its complete.
*
*/
abstract class ResubmitExecutor[T](corePoolSize: Int, maxPoolSize: Int, keepAliveTime: Long, timeUnit: TimeUnit,
workQueue: BlockingQueue[Runnable])(implicit clock: Clock) extends ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue) with StrictLogging {

private val logEvery = new LogAtFrequency(5_000)
private val shuttingDown = new AtomicBoolean(false)

override def shutdown(): Unit = {
shuttingDown.set(true)
super.shutdown()
}

override def shutdownNow(): util.List[Runnable] = {
shuttingDown.set(true)
super.shutdownNow()
}

override def afterExecute(r: Runnable, t: Throwable): Unit = {
super.afterExecute(r, t)

if(!shuttingDown.get()){
val futureTask = r.asInstanceOf[FutureTask[T]]
if(shouldResubmit(futureTask, t)){
retry(futureTask, t)
if(logEvery.shouldLog()){
logger.info("Finished runnable" + futureTask.get() + " resubmitting...")
}
}

}
}

def retry(runnable: FutureTask[T], t: Throwable): Unit ={
this.submit(newCallable(runnable, t))
}

def newCallable(r: FutureTask[T], t: Throwable): Callable[T]
def shouldResubmit(r: FutureTask[T], t: Throwable): Boolean
def newWorkItem(r: FutureTask[T]): WorkItem[T]
}
6 changes: 4 additions & 2 deletions vuu/src/main/scala/org/finos/vuu/SimulMain.scala
Expand Up @@ -7,7 +7,7 @@ import org.finos.vuu.core.module.metrics.MetricsModule
import org.finos.vuu.core.module.simul.SimulationModule
import org.finos.vuu.core.module.typeahead.TypeAheadModule
import org.finos.vuu.core.module.vui.VuiStateModule
import org.finos.vuu.core.{VuuSecurityOptions, VuuServer, VuuServerConfig, VuuWebSocketOptions}
import org.finos.vuu.core.{VuuSecurityOptions, VuuServer, VuuServerConfig, VuuThreadingOptions, VuuWebSocketOptions}
import org.finos.vuu.net.{AlwaysHappyLoginValidator, Authenticator, LoggedInTokenValidator}
import org.finos.vuu.net.auth.AlwaysHappyAuthenticator
import org.finos.vuu.net.http.VuuHttp2ServerOptions
Expand Down Expand Up @@ -61,7 +61,9 @@ object SimulMain extends App with StrictLogging {
.withWsPort(8090),
VuuSecurityOptions()
.withAuthenticator(authenticator)
.withLoginValidator(new AlwaysHappyLoginValidator)
.withLoginValidator(new AlwaysHappyLoginValidator),
VuuThreadingOptions()
.withViewPortThreads(1)
).withModule(SimulationModule())
.withModule(MetricsModule())
.withModule(VuiStateModule(store))
Expand Down
87 changes: 24 additions & 63 deletions vuu/src/main/scala/org/finos/vuu/core/VuuServer.scala
@@ -1,66 +1,26 @@
package org.finos.vuu.core

import com.typesafe.scalalogging.StrictLogging
import org.finos.toolbox.jmx.MetricsProvider
import org.finos.toolbox.lifecycle.{LifecycleContainer, LifecycleEnabled}
import org.finos.toolbox.thread.{LifeCycleRunOncePerThreadExecutorRunner, LifeCycleRunner, WorkItem}
import org.finos.toolbox.time.Clock
import org.finos.vuu.api.{JoinTableDef, TableDef, ViewPortDef}
import org.finos.vuu.core.module.{ModuleContainer, RealizedViewServerModule, StaticServedResource, ViewServerModule}
import org.finos.vuu.core.table.{DataTable, TableContainer}
import org.finos.vuu.net._
import org.finos.vuu.net.auth.AlwaysHappyAuthenticator
import org.finos.vuu.net.http.{Http2Server, VuuHttp2Server, VuuHttp2ServerOptions, VuuSecurityOptions}
import org.finos.vuu.net.http.{Http2Server, VuuHttp2Server}
import org.finos.vuu.net.json.{CoreJsonSerializationMixin, JsonVsSerializer, Serializer}
import org.finos.vuu.net.rest.RestService
import org.finos.vuu.net.rpc.{JsonSubTypeRegistry, RpcHandler}
import org.finos.vuu.net.ws.WebSocketServer
import org.finos.vuu.provider.{JoinTableProvider, JoinTableProviderImpl, Provider, ProviderContainer}
import org.finos.vuu.viewport.{ViewPortAction, ViewPortActionMixin, ViewPortContainer}
import org.finos.toolbox.jmx.MetricsProvider
import org.finos.toolbox.lifecycle.{LifecycleContainer, LifecycleEnabled}
import org.finos.toolbox.thread.LifeCycleRunner
import org.finos.toolbox.time.Clock

object VuuSecurityOptions{
def apply(): VuuSecurityOptions = {
VuuSecurityOptionsImpl(new AlwaysHappyAuthenticator, new AlwaysHappyLoginValidator)
}
}

object VuuWebSocketOptions {
def apply(): VuuWebSocketOptions = {
VuuWebSocketOptionsImpl(8090, "/websocket")
}
}

trait VuuWebSocketOptions {

def wsPort: Int

def uri: String
import org.finos.vuu.viewport._

def withWsPort(port: Int): VuuWebSocketOptions

def withUri(uri: String): VuuWebSocketOptions

}

case class VuuSecurityOptionsImpl(authenticator: Authenticator, loginTokenValidator: LoginTokenValidator) extends VuuSecurityOptions{
override def withAuthenticator(authenticator: Authenticator): VuuSecurityOptions = this.copy(authenticator = authenticator)
override def withLoginValidator(tokenValidator: LoginTokenValidator): VuuSecurityOptions = this.copy(authenticator = authenticator)
}

case class VuuWebSocketOptionsImpl(wsPort: Int, uri: String) extends VuuWebSocketOptions {
override def withWsPort(port: Int): VuuWebSocketOptions = this.copy(wsPort = port)

override def withUri(uri: String): VuuWebSocketOptions = this.copy(uri = uri)
}

case class VuuServerConfig(httpOptions: VuuHttp2ServerOptions = VuuHttp2ServerOptions(), wsOptions: VuuWebSocketOptions = VuuWebSocketOptions(), security: VuuSecurityOptions = VuuSecurityOptions(), modules: List[ViewServerModule] = List()) {
def withModule(module: ViewServerModule): VuuServerConfig = {
this.copy(modules = modules ++ List(module))
}
}
import java.util.concurrent.{Callable, FutureTask}

/**
* View Server
* Vuu Server
*/
class VuuServer(config: VuuServerConfig)(implicit lifecycle: LifecycleContainer, timeProvider: Clock, metricsProvider: MetricsProvider) extends LifecycleEnabled with StrictLogging {

Expand Down Expand Up @@ -89,10 +49,6 @@ class VuuServer(config: VuuServerConfig)(implicit lifecycle: LifecycleContainer,

val serverApi = new CoreServerApiHander(viewPortContainer, tableContainer, providerContainer)

//val processor = new RequestProcessor(authenticator, tokenValidator, sessionContainer, serverApi, JsonVsSerializer)

//val handler = new ViewServerHandler(serializer, processor)

val factory = new ViewServerHandlerFactoryImpl(authenticator, tokenValidator, sessionContainer, serverApi, JsonVsSerializer, moduleContainer)

//order of creation here is important
Expand All @@ -108,8 +64,22 @@ class VuuServer(config: VuuServerConfig)(implicit lifecycle: LifecycleContainer,
val handlerRunner = new LifeCycleRunner("sessionRunner", () => sessionContainer.runOnce(), minCycleTime = 1)
lifecycle(handlerRunner).dependsOn(joinProviderRunner)

val viewPortRunner = new LifeCycleRunner("viewPortRunner", () => viewPortContainer.runOnce())
lifecycle(viewPortRunner).dependsOn(server)
val viewPortRunner = if(config.threading.viewportThreads == 1){
val viewPortRunner = new LifeCycleRunner("viewPortRunner", () => viewPortContainer.runOnce())
lifecycle(viewPortRunner).dependsOn(server)
viewPortRunner

}else {
val viewPortRunner =
new LifeCycleRunOncePerThreadExecutorRunner[ViewPort](s"viewPortExecutorRunner[${config.threading.viewportThreads}]", config.threading.viewportThreads, () => {
viewPortContainer.getViewPorts().filter(_.isEnabled).map(vp => ViewPortWorkItem(vp, viewPortContainer)) })
{
override def newCallable(r: FutureTask[ViewPort]): Callable[ViewPort] = ViewPortCallable(r, viewPortContainer)
override def newWorkItem(r: FutureTask[ViewPort]): WorkItem[ViewPort] = ViewPortWorkItem(r.get(), viewPortContainer)
}
lifecycle(viewPortRunner).dependsOn(server)
viewPortRunner
}

val groupByRunner = new LifeCycleRunner("groupByRunner", () => viewPortContainer.runGroupByOnce())
lifecycle(groupByRunner).dependsOn(server)
Expand Down Expand Up @@ -140,25 +110,16 @@ class VuuServer(config: VuuServerConfig)(implicit lifecycle: LifecycleContainer,

val realized = new RealizedViewServerModule {
override def rpcHandlers: List[RpcHandler] = module.rpcHandlersUnrealized.map(_.apply(vs))

override def restServices: List[RestService] = module.restServicesUnrealized.map(_.apply(vs))

override def name: String = module.name

override def tableDefs: List[TableDef] = module.tableDefs

override def serializationMixin: AnyRef = module.serializationMixin

override def rpcHandlersUnrealized: List[VuuServer => RpcHandler] = module.rpcHandlersUnrealized

override def restServicesUnrealized: List[VuuServer => RestService] = module.restServicesUnrealized

override def getProviderForTable(table: DataTable, viewserver: VuuServer)(implicit time: Clock, life: LifecycleContainer): Provider = {
module.getProviderForTable(table, viewserver)(time, life)
}

override def staticFileResources(): List[StaticServedResource] = module.staticFileResources()

override def viewPortDefs: Map[String, (DataTable, Provider, ProviderContainer) => ViewPortDef] = module.viewPortDefs
}

Expand Down

0 comments on commit bba484a

Please sign in to comment.