Skip to content

Commit

Permalink
JSON API: backport #18866 to 2.3.x (#18903)
Browse files Browse the repository at this point in the history
#18866

The tests are in `main-2.x` but were not backported here because there was quite a lot of structural change in the tests since 2.3, particularly to test pruning.
  • Loading branch information
raphael-speyer-da committed Mar 28, 2024
1 parent 0250f8c commit 1e70659
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,26 @@ sealed abstract class Queries(tablePrefix: String, tpIdCacheMaxEntries: Long)(im
.map(_.toMap)
}

def hasVisibleContracts(parties: PartySet, tpid: SurrogateTpId): ConnectionIO[Boolean]
def oldestVisibleOffset(
parties: PartySet,
tpid: SurrogateTpId,
)(implicit
log: LogHandler
): ConnectionIO[Option[String]] = {
sql"""
SELECT MIN(last_offset) FROM $ledgerOffsetTableName WHERE tpid = $tpid AND party IN (${commonWitnesses(
parties,
tpid,
)})
"""
.query[Option[String]]
.unique
}

/** A query returning a single-column table of all the parties that can also see
* contracts that `parties` can see, of template tpid.
*/
protected def commonWitnesses(parties: PartySet, tpid: SurrogateTpId): Fragment

/** Template IDs, parties, and offsets that don't match expected offset values for
* a particular `tpid`.
Expand Down Expand Up @@ -820,19 +839,17 @@ private final class PostgresQueries(tablePrefix: String, tpIdCacheMaxEntries: Lo
VALUES ($packageId, $moduleName, $entityName)
ON CONFLICT (package_id, template_module_name, template_entity_name) DO NOTHING"""

override def hasVisibleContracts(
protected override def commonWitnesses(
parties: PartySet,
tpid: SurrogateTpId,
): ConnectionIO[Boolean] = {
): Fragment = {
import ipol.pas
val partyVector: Vector[String] = parties.toVector
sql"""SELECT EXISTS(
SELECT 1 FROM $contractTableName AS c
WHERE (signatories && $partyVector::text[] OR observers && $partyVector::text[])
AND ($tpid = tpid)
)"""
.query[Boolean]
.unique
sql"""
SELECT DISTINCT unnest(signatories || observers) AS witness FROM $contractTableName AS c
WHERE (signatories && $partyVector::text[] OR observers && $partyVector::text[])
AND ($tpid = tpid)
"""
}
}

Expand Down Expand Up @@ -1120,21 +1137,18 @@ private final class OracleQueries(
INTO $templateIdTableName (package_id, template_module_name, template_entity_name)
VALUES ($packageId, $moduleName, $entityName)"""

override def hasVisibleContracts(
protected override def commonWitnesses(
parties: PartySet,
tpid: SurrogateTpId,
): ConnectionIO[Boolean] = {
): Fragment = {
import Queries.CompatImplicits.catsReducibleFromFoldable1
sql"""SELECT 1
FROM $contractTableName c
JOIN $contractStakeholdersViewName cst
ON (c.contract_id = cst.contract_id AND c.tpid = cst.tpid)
WHERE (${Fragments.in(fr"cst.stakeholder", parties.toNEF)})
AND c.tpid = $tpid
FETCH NEXT 1 ROWS ONLY"""
.query[Int]
.option
.map(_.isDefined)
sql"""
SELECT DISTINCT c2.stakeholder
FROM $contractStakeholdersViewName c1
JOIN $contractStakeholdersViewName c2 ON c1.contract_id = c2.contract_id
WHERE c1.tpid = $tpid
AND ${Fragments.in(fr"""c1.stakeholder""", parties.toNEF)}
"""
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,7 @@ private class ContractsFetch(
mat: Materializer,
lc: LoggingContextOf[InstanceUUID],
): ConnectionIO[BeginBookmark[Terminates.AtAbsolute]] = {
import cats.instances.list._, cats.syntax.foldable.{toFoldableOps => ToFoldableOps},
cats.syntax.traverse.{toTraverseOps => ToTraverseOps}, cats.syntax.functor._, doobie.implicits._
import cats.syntax.traverse.{toTraverseOps => ToTraverseOps}, cats.syntax.functor._
// we can fetch for all templateIds on a single acsFollowingAndBoundary
// by comparing begin offsets; however this is trickier so we don't do it
// right now -- Stephen / Leo
Expand All @@ -153,17 +152,15 @@ private class ContractsFetch(
// fetch cannot go "too far" the second time
templateIds
.traverse({ case t =>
ContractDao.hasVisibleContracts(fetchContext.parties, t).flatMap { case visibleContracts =>
// If there are pre-existing contracts visible to these parties, we cannot load the cache
// from ACS. We need to initialise from transactions instead to ensure we don't miss any
// archivals which might have happened before this fetch.
val disableAcs = visibleContracts
fetchAndPersist(fetchContext, disableAcs, absEnd, t)
}
for {
oldestVisible <- ContractDao.oldestVisibleOffset(fetchContext.parties, t)
actualAbsEnds <- fetchAndPersist(fetchContext, oldestVisible, absEnd, t)
} yield (actualAbsEnds, oldestVisible)
})
.flatMap { actualAbsEnds =>
.flatMap { actualAbsEndAndOldestVisibles =>
import domain.Offset.`Offset ordering`
val (actualAbsEnds, oldestVisibles) = actualAbsEndAndOldestVisibles.unzip
val newAbsEndTarget = {
import scalaz.std.list._, scalaz.syntax.foldable._, domain.Offset.`Offset ordering`
// it's fine if all yielded LedgerBegin, so we don't want to conflate the "fallback"
// with genuine results
actualAbsEnds.maximum getOrElse AbsoluteBookmark(absEnd.toDomain)
Expand All @@ -172,6 +169,7 @@ private class ContractsFetch(
case LedgerBegin =>
fconn.pure(AbsoluteBookmark(absEnd))
case AbsoluteBookmark(feedback) =>
val oldestVisibleOffset = oldestVisibles.flatten.minimum
val feedbackTerminator = Terminates fromDomain feedback
// contractsFromOffsetIo can go _past_ absEnd, because the ACS ignores
// this argument; see https://github.com/digital-asset/daml/pull/8226#issuecomment-756446537
Expand All @@ -180,12 +178,12 @@ private class ContractsFetch(
// to "catch them up" to the one that "raced" ahead
(actualAbsEnds zip templateIds)
.collect { case (`newAbsEndTarget`, templateId) => templateId }
.traverse_ {
.traverse {
// passing a priorBookmark prevents contractsIo_ from using the ACS,
// and it cannot go "too far" reading only the tx stream
fetchAndPersist(
fetchContext,
true,
oldestVisibleOffset,
feedbackTerminator,
_,
)
Expand All @@ -197,7 +195,7 @@ private class ContractsFetch(

private[this] def fetchAndPersist(
fetchContext: FetchContext,
disableAcs: Boolean,
oldestVisible: Option[domain.Offset],
absEnd: Terminates.AtAbsolute,
templateId: domain.TemplateId.RequiredPkg,
)(implicit
Expand All @@ -210,7 +208,7 @@ private class ContractsFetch(

def loop(maxAttempts: Int): ConnectionIO[BeginBookmark[domain.Offset]] = {
logger.debug(s"contractsIo, maxAttempts: $maxAttempts")
(contractsIo_(fetchContext, disableAcs, absEnd, templateId) <* fconn.commit)
(contractsIo_(fetchContext, oldestVisible, absEnd, templateId) <* fconn.commit)
.exceptSql {
case e if maxAttempts > 0 && retrySqlStates(e.getSQLState) =>
logger.debug(s"contractsIo, exception: ${e.description}, state: ${e.getSQLState}")
Expand All @@ -226,7 +224,7 @@ private class ContractsFetch(

private def contractsIo_(
fetchContext: FetchContext,
disableAcs: Boolean,
oldestVisible: Option[domain.Offset],
absEnd: Terminates.AtAbsolute,
templateId: domain.TemplateId.RequiredPkg,
)(implicit
Expand All @@ -241,11 +239,11 @@ private class ContractsFetch(
fetchContext,
templateId,
offsets,
disableAcs,
oldestVisible,
absEnd,
)
_ = logger.debug(
s"contractsFromOffsetIo($fetchContext, $templateId, $offsets, $disableAcs, $absEnd): $offset1"
s"contractsFromOffsetIo($fetchContext, $templateId, $offsets, $oldestVisible, $absEnd): $offset1"
)
} yield offset1
}
Expand Down Expand Up @@ -288,7 +286,7 @@ private class ContractsFetch(
fetchContext: FetchContext,
templateId: domain.TemplateId.RequiredPkg,
offsets: Map[domain.Party, domain.Offset],
disableAcs: Boolean,
oldestVisible: Option[domain.Offset],
absEnd: Terminates.AtAbsolute,
)(implicit
ec: ExecutionContext,
Expand Down Expand Up @@ -320,8 +318,11 @@ private class ContractsFetch(
)(lc)

// include ACS iff starting at LedgerBegin
val (idses, lastOff) = (startOffset, disableAcs) match {
case (LedgerBegin, false) =>
val (idses, lastOff) = (startOffset, oldestVisible) match {
// This template has not been loaded before by these parties, nor has any other party
// loaded a contract of this template that is visible by any of these parties.
// The cache is empty and may be loaded from a snapshot of the ACS.
case (LedgerBegin, None) =>
val stepsAndOffset = builder add acsFollowingAndBoundary(txnK)
stepsAndOffset.in <~ getActiveContracts(
jwt,
Expand All @@ -331,7 +332,25 @@ private class ContractsFetch(
)(lc)
(stepsAndOffset.out0, stepsAndOffset.out1)

case (AbsoluteBookmark(_), _) | (LedgerBegin, true) =>
case (LedgerBegin, Some(oldestVisibleOffset)) =>
// This template has not been loaded by these parties, but another party has loaded
// this template AND in the process contracts have been loaded into cache that were
// visible to these parties.
// In this case, it's possible that the relevant contracts have since been archived
// from the ledger. If we load the cache from an ACS snapshot, we would never learn
// about those archivals, so we need to load from transactions instead.
// We only need to see transactions after the oldest cache offset of such contracts,
// as any archivals prior to that would have already been deleted from the cache.
val stepsAndOffset = builder add transactionsFollowingBoundary(txnK)
stepsAndOffset.in <~ Source.single(AbsoluteBookmark(oldestVisibleOffset))
(
(stepsAndOffset: FanOutShape2[_, ContractStreamStep.LAV1, _]).out0,
stepsAndOffset.out1,
)

case (AbsoluteBookmark(_), _) =>
// This template has been loaded by these parties before.
// Update cache with transactions since the last offset.
val stepsAndOffset = builder add transactionsFollowingBoundary(txnK)
stepsAndOffset.in <~ Source.single(startOffset)
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,16 +151,16 @@ object ContractDao {
}
}

def hasVisibleContracts(parties: domain.PartySet, templateId: domain.TemplateId.RequiredPkg)(
def oldestVisibleOffset(parties: domain.PartySet, templateId: domain.TemplateId.RequiredPkg)(
implicit
log: LogHandler,
sjd: SupportedJdbcDriver.TC,
lc: LoggingContextOf[InstanceUUID],
): ConnectionIO[Boolean] = {
): ConnectionIO[Option[domain.Offset]] = {
for {
tpId <- surrogateTemplateId(templateId)
hasVisible <- sjd.q.queries.hasVisibleContracts(queriesPartySet(parties), tpId)
} yield hasVisible
oldestVisible <- sjd.q.queries.oldestVisibleOffset(queriesPartySet(parties), tpId)
} yield oldestVisible.map(domain.Offset(_))
}

/** A "lagging offset" is a template-ID/party pair whose stored offset may not reflect
Expand Down

0 comments on commit 1e70659

Please sign in to comment.