Skip to content

Commit

Permalink
fix(sawtooth-daml-tp): defer events until after all other activities
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin O'Donnell <[email protected]>
  • Loading branch information
scealiontach committed Nov 6, 2020
1 parent 9e8e47c commit 7154213
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,18 @@ public static String makeAddress(final String ns, final String... parts) {
return ns + hash;
}

/**
* Make an address given a namespace, base key and part.
*
* @param key the base key bytestring
* @param part the part of the key (0 is the base)
* @return the address
*/
public static String makeLeafAddress(final ByteString key, final int part) {
return makeAddress(Namespace.DAML_STATE_VALUE_NS, key.toStringUtf8(), "part",
Integer.toString(part));
}

/**
* Construct a context address for the given DamlStateKey.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public final class ContextLedgerState implements LedgerState<String> {

private static final Logger LOGGER = LoggerFactory.getLogger(ContextLedgerState.class.getName());

private List<DamlEvent> deferredEvents;
/**
* The state which this class wraps and delegates to.
*/
Expand All @@ -69,6 +70,7 @@ public final class ContextLedgerState implements LedgerState<String> {
*/
public ContextLedgerState(final Context aState) {
this.state = aState;
this.deferredEvents = new ArrayList<>();
}

private ByteString getStateOrNull(final String address)
Expand All @@ -77,15 +79,44 @@ private ByteString getStateOrNull(final String address)
if (stateMap.containsKey(address)) {
final ByteString bs = stateMap.get(address);
if (bs.isEmpty() || bs == null) {
LOGGER.debug("address={} is set isEmpty={} size={}", address, bs.isEmpty(),
bs.toByteArray().length);
return null;
} else {
LOGGER.debug("address={} is set isEmpty={} size={}", address, bs.isEmpty(),
bs.toByteArray().length);
return bs;
}
} else {
LOGGER.debug("address={} is not set", address);
return null;
}
}

private ByteString getStateOrNull(final String address, final int retries)
throws InternalError, InvalidTransactionException {
final Map<String, ByteString> stateMap = state.getState(List.of(address));
int trials = 0;
while (trials < retries) {
if (stateMap.containsKey(address)) {
final ByteString bs = stateMap.get(address);
if (bs.isEmpty() || bs == null) {
LOGGER.debug("address={} is set isEmpty={} size={}", address, bs.isEmpty(),
bs.toByteArray().length);
} else {
LOGGER.debug("address={} is set isEmpty={} size={}", address, bs.isEmpty(),
bs.toByteArray().length);
return bs;
}
} else {
LOGGER.debug("address={} is not set", address);
}
trials++;
state.getState(List.of(address));
}
return null;
}

@Override
public ByteString getDamlState(final ByteString key)
throws InternalError, InvalidTransactionException {
Expand All @@ -104,24 +135,23 @@ public ByteString getDamlState(final ByteString key)
bs.toByteArray().length);
while (hasMore) {
int part = veList.size();
final String nextAddr = Namespace.makeAddress(Namespace.DAML_STATE_VALUE_NS, key.toStringUtf8(), "part",
String.valueOf(part));
bs = getStateOrNull(nextAddr);
final String nextAddr = Namespace.makeLeafAddress(key, part);
final ByteString subBS = getStateOrNull(nextAddr, 10);
int sz = 0;
if (bs == null) {
if (subBS == null) {
hasMore = false;
} else {
sz = bs.toByteArray().length;
envelope = VersionedEnvelope.parseFrom(bs);
veList.add(envelope);
hasMore = envelope.getHasMore();
sz = subBS.toByteArray().length;
VersionedEnvelope env = VersionedEnvelope.parseFrom(subBS);
veList.add(env);
hasMore = env.getHasMore();
}
LOGGER.debug("Fetched next address={} part={} hasMore={} size={}", nextAddr, part,
hasMore, sz);
}
ByteString val = SawtoothClientUtils.unwrapMultipart(veList);
LOGGER.info("Read address={} parts={} size={}", addr, veList.size(),
val.toByteArray().length);
val.toByteArray().length);
return val;
} catch (InvalidProtocolBufferException e) {
throw new InvalidTransactionException(e.getMessage());
Expand Down Expand Up @@ -171,8 +201,8 @@ public DamlTransaction assembleTransactionFragments(final DamlTransactionFragmen
LOGGER.warn("Assembled hash does not match! {} != {}", contentHash, assembledHash);
}
result = ByteString.copyFrom(accumulatedBytes);
DamlTransaction tx = DamlTransaction.parseFrom(result);
return tx;
DamlTransaction tx = DamlTransaction.parseFrom(result);
return tx;
} catch (InvalidProtocolBufferException e) {
throw new InvalidTransactionException(e.getMessage());
}
Expand Down Expand Up @@ -202,7 +232,6 @@ public Map<ByteString, ByteString> getDamlStates(final ByteString... keys)
@Override
public void setDamlStates(final Collection<Entry<ByteString, ByteString>> entries)
throws InternalError, InvalidTransactionException {
final Map<String, ByteString> setMap = new HashMap<>();
String firstAddress = "";
for (final Entry<ByteString, ByteString> e : entries) {
final ByteString key = e.getKey();
Expand All @@ -211,30 +240,30 @@ public void setDamlStates(final Collection<Entry<ByteString, ByteString>> entrie
int index = 0;
int size = 0;
for (ByteString p : parts) {
final Map<String, ByteString> setMap = new HashMap<>();
final String address;
if (index == 0) {
address = Namespace.makeDamlStateAddress(key);
firstAddress = address;
} else {
address = Namespace.makeAddress(Namespace.DAML_STATE_VALUE_NS, key.toStringUtf8(), "part",
String.valueOf(index));
address = Namespace.makeLeafAddress(key, index);
}
LOGGER.debug("Set address={} part={} size={}", address, index, p.size());
setMap.put(address, p);
index++;
size += p.size();
state.setState(setMap.entrySet());
}
LOGGER.info("Set address={} totalParts={} totalSize={}", firstAddress, index, size);
}
state.setState(setMap.entrySet());
}

@Override
public void storeTransactionFragmet(final DamlTransactionFragment tx)
throws InternalError, InvalidTransactionException {
final String address = Namespace.makeAddress(Namespace.DAML_TX_NS, "fragment",
tx.getLogEntryId().toStringUtf8(), String.valueOf(tx.getParts()),
String.valueOf(tx.getPartNumber()));
final String address =
Namespace.makeAddress(Namespace.DAML_TX_NS, "fragment", tx.getLogEntryId().toStringUtf8(),
String.valueOf(tx.getParts()), String.valueOf(tx.getPartNumber()));
final ByteString val = tx.toByteString();
LOGGER.info("Storing fragment at tx={} address={} size={}", tx.getLogEntryId().toStringUtf8(),
address, val.size());
Expand Down Expand Up @@ -400,8 +429,8 @@ private void largeEvent(final String entryId, final List<ByteString> multipart)
StringBuilder fetchAddressBldr = new StringBuilder();
int totalBytes = 0;
for (ByteString bs : multipart) {
String address = Namespace.makeAddress(Namespace.DAML_EVENT_NS, "logentry", entryId,
"part", String.valueOf(index));
String address = Namespace.makeAddress(Namespace.DAML_EVENT_NS, "logentry", entryId, "part",
String.valueOf(index));
if (index > 0) {
fetchAddressBldr.append(",");
}
Expand All @@ -413,7 +442,31 @@ private void largeEvent(final String entryId, final List<ByteString> multipart)
attrMap.put(EventConstants.DAML_LOG_FETCH_IDS_ATTRIBUTE, fetchAddressBldr.toString());
state.setState(setMap.entrySet());
LOGGER.info("Stored {} entries totalling {} bytes", multipart.size(), totalBytes);
state.addEvent(EventConstants.DAML_LOG_EVENT_SUBJECT, attrMap.entrySet(), ByteString.EMPTY);
DamlEvent de = new DamlEvent(EventConstants.DAML_LOG_EVENT_SUBJECT, attrMap, ByteString.EMPTY);
this.deferredEvents.add(de);
}

public void flushDeferredEvents() throws InternalError {
for (DamlEvent evt : this.deferredEvents) {
evt.flush(state);
}
}

private class DamlEvent {
private String subject;
private Map<String, String> attrMap;
private ByteString data;

DamlEvent(final String subj, final Map<String, String> attrs, final ByteString bs) {
this.subject = subj;
this.attrMap = attrs;
this.data = bs;
}

public void flush(final Context ledgerState) throws InternalError {
LOGGER.info("Sending event on {} with {} attributes and data size={}", subject,
attrMap.size(), data.size());
ledgerState.addEvent(subject, attrMap.entrySet(), data);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public void apply(final TpProcessRequest tpProcessRequest, final Context state)
LOGGER.debug("DamlOperation contains no supported operation, ignoring ...");
}
}
ledgerState.flushDeferredEvents();
LOGGER.info("Completed {} operations", batch.getOperationsList().size());
} catch (final InvalidProtocolBufferException ipbe) {
LOGGER.error("Failed to parse DamlSubmission protocol buffer:");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,9 @@ void storeTransactionFragmet(DamlTransactionFragment tx)
DamlTransaction assembleTransactionFragments(DamlTransactionFragment endTx)
throws InternalError, InvalidTransactionException;

/**
* Flush any deferred events, for instance to put them after state updates.
* @throws InternalError
*/
void flushDeferredEvents() throws InternalError;
}
Binary file added test.out
Binary file not shown.

0 comments on commit 7154213

Please sign in to comment.