Skip to content

Commit

Permalink
Stop using onSpinWait (#203)
Browse files Browse the repository at this point in the history
Alters Thread.onSpinWait to use latch and await to avoid using CPU cycles to wait.
Also adds additional debug/trace logging to Solr route
Adds the default transform as the default on the property for Solr indexing
Adds some additional documentation on Solr indexing.
  • Loading branch information
whikloj authored May 23, 2024
1 parent c6f5467 commit 40041f9
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 27 deletions.
32 changes: 22 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,17 +124,29 @@ This application listens to Fedora's event stream and
indexes objects into an external Solr server.

#### Properties
| Name | Description| Default Value |
| :--- | :---| :---- |
| Name | Description | Default Value |
| :--- |:---| :---- |
| solr.indexing.enabled | Enables/disables the SOLR indexing service. Disabled by default | false |
| solr.fcrepo.checkHasIndexingTransformation | When true, check for an indexing transform in the resource matadata. | true |
| solr.fcrepo.defaultTransform | The solr default XSL transform when none is provide in resource metadata. | null |
| solr.input.stream | The JMS topic or queue serving as the message source | broker:topic:fedora |
| solr.reindex.stream | The JMS topic or queue serving as the reindex message source | broker:queue:solr.reindex |
| solr.commitWithin | Milliseconds within which commits should occur | 10000 |
| solr.indexing.predicate | When true, check that resource is of type http://fedora.info/definitions/v4/indexing#Indexable; otherwise do not index it. | false |
| solr.filter.containers | A comma-separate list of containers that should be ignored by the indexer | http://localhost:8080/fcrepo/rest/audit |

| solr.fcrepo.checkHasIndexingTransformation | When true, check for an indexing transform in the resource metadata with the predicate http://fedora.info/definitions/v4/indexing#hasIndexingTransformation | true |
| solr.fcrepo.defaultTransform | The solr default XSL transform when none is provide in resource metadata. | null |
| solr.input.stream | The JMS topic or queue serving as the message source | broker:topic:fedora |
| solr.reindex.stream | The JMS topic or queue serving as the reindex message source | broker:queue:solr.reindex |
| solr.commitWithin | Milliseconds within which commits should occur | 10000 |
| solr.indexing.predicate | When true, check that resource is of type http://fedora.info/definitions/v4/indexing#Indexable; otherwise do not index it.| false |
| solr.filter.containers | A comma-separate list of containers that should be ignored by the indexer| http://localhost:8080/fcrepo/rest/audit |

**Note**: You must start with the `file://` protocol when defining the path to a custom XSLT for either the `solr.fcrepo.defaultTransform`
or within the resource using the `http://fedora.info/definitions/v4/indexing#hasIndexingTransformation` predicate.

For example,
```text
solr.fcrepo.defaultTransform=file:///path/to/your/transform.xsl
```
or
```text
@prefix indexing: <http://fedora.info/definitions/v4/indexing#> .
<> indexing:hasIndexingTransformation <file:///path/to/your/transform.xsl> .
```

### Repository Indexer (Triplestore)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import java.nio.file.Path;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;

import static org.fcrepo.camel.common.config.BasePropsConfig.FCREPO_CAMEL_CONFIG_FILE_PROPERTY;
import static org.slf4j.LoggerFactory.getLogger;
Expand Down Expand Up @@ -42,12 +43,20 @@ public Integer call() {
System.setProperty(FCREPO_CAMEL_CONFIG_FILE_PROPERTY, configurationFilePath.toFile().getAbsolutePath());
}
final var appContext = new AnnotationConfigApplicationContext("org.fcrepo.camel");
appContext.registerShutdownHook();
final var countdownLatch = new CountDownLatch(1);

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
LOGGER.info("Shutting down fcrepo-camel-toolbox...");
appContext.stop();
countdownLatch.countDown();
}));
appContext.start();
LOGGER.info("fcrepo-camel-toolbox started.");

while (appContext.isRunning()) {
Thread.onSpinWait();
try {
countdownLatch.await();
} catch (final InterruptedException e) {
// Ignore error because we are exiting anyways.
return 1;
}
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ static class SolrIndexingEnabled extends ConditionOnPropertyTrue {
@Value("${solr.fcrepo.checkHasIndexingTransformation:true}")
private boolean checkHasIndexingTransformation;

@Value("${solr.fcrepo.defaultTransform:}")
@Value("${solr.fcrepo.defaultTransform:org/fcrepo/camel/indexing/solr/default_transform.xsl}")
private String defaultTransform;

@Value("${solr.input.stream:broker:topic:fedora}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ public class SolrRouter extends RouteBuilder {
*/
public void configure() throws Exception {

logger.debug("Solr Router starting...");
logger.trace("solr.indexing.predicate = '{}'", config.isIndexingPredicate());
logger.trace("solr.checkHasIndexingTransformation = '{}'", config.isCheckHasIndexingTransformation());
logger.trace("solr.defaultTransform = '{}'", config.getDefaultTransform());
logger.trace("solr.input.stream = '{}'", config.getInputStream());
logger.trace("solr.baseUrl = '{}'", config.getSolrBaseUrl());

final Namespaces ns = new Namespaces("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#");
ns.add("indexing", "http://fedora.info/definitions/v4/indexing#");
ns.add("ldp", "http://www.w3.org/ns/ldp#");
Expand Down Expand Up @@ -100,9 +107,10 @@ public void configure() throws Exception {
header(FCREPO_URI).isEqualTo(constant(uri))))
.collect(toList()))))
.choice()
.when(and(simple(config.isIndexingPredicate() + " != 'true'"),
simple(config.isCheckHasIndexingTransformation() + " != 'true'")))
.when(and(not(constant(config.isIndexingPredicate())),
not(constant(config.isCheckHasIndexingTransformation()))))
.setHeader(INDEXING_TRANSFORMATION).simple(config.getDefaultTransform())
.log(LoggingLevel.TRACE, "Indexing Transformation set to: ${header.CamelIndexingTransformation}")
.log(LoggingLevel.INFO, "sending to update_solr")
.to("direct:update.solr")
.otherwise()
Expand All @@ -111,13 +119,17 @@ public void configure() throws Exception {
+ "?preferOmit=PreferContainment&accept=application/rdf+xml"
)
.setHeader(INDEXING_TRANSFORMATION).xpath(hasIndexingTransformation, String.class, ns)
.log(LoggingLevel.TRACE, logger, "Indexing Transformation: ${header.CamelIndexingTransformation}")
.choice()
.when(or(header(INDEXING_TRANSFORMATION).isNull(),
header(INDEXING_TRANSFORMATION).isEqualTo("")))
.setHeader(INDEXING_TRANSFORMATION).simple(config.getDefaultTransform()).end()
.setHeader(INDEXING_TRANSFORMATION).simple(config.getDefaultTransform())
.log(LoggingLevel.TRACE, logger, "No indexing transform found on the resource, using " +
"default transform: ${header.CamelIndexingTransformation}")
.end()
.removeHeaders("CamelHttp*")
.choice()
.when(or(simple(config.isIndexingPredicate() + " != 'true'"),
.when(or(not(constant(config.isIndexingPredicate())),
header(FCREPO_RESOURCE_TYPE).contains(INDEXABLE)))
.to("direct:update.solr")
.otherwise()
Expand Down Expand Up @@ -146,14 +158,12 @@ public void configure() throws Exception {
// Don't index the transformation itself
.filter().simple("${header.CamelIndexingTransformation} != ${header.CamelIndexingUri}")
.choice()
.when(header(INDEXING_TRANSFORMATION).isNotNull())
.when(and(header(INDEXING_TRANSFORMATION).isNotNull(),
header(INDEXING_TRANSFORMATION).isNotEqualTo("")))
.log(LoggingLevel.INFO, logger,
"Sending RDF for Transform with with XSLT from ${header.CamelIndexingTransformation}")
.toD("xslt:${header.CamelIndexingTransformation}")
.to("direct:send.to.solr")
.when(or(header(INDEXING_TRANSFORMATION).isNull(), header(INDEXING_TRANSFORMATION).isEqualTo("")))
.log(LoggingLevel.INFO, logger,"No Transform supplied")
.to("direct:send.to.solr")
.otherwise()
.log(LoggingLevel.INFO, logger, "Skipping ${header.CamelFcrepoUri}");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,7 @@ public void testUpdateRouter() throws Exception {
final var context = camelContext.adapt(ModelCamelContext.class);

AdviceWith.adviceWith(context, "FcrepoSolrUpdater", a -> {
a.mockEndpointsAndSkip("fcrepo*");
a.mockEndpointsAndSkip("http4*");
a.mockEndpointsAndSkip("xslt:*");
});

AdviceWith.adviceWith(context, "FcrepoSolrSend", a -> {
Expand All @@ -275,8 +274,11 @@ public void testUpdateRouter() throws Exception {
solrUpdateEndPoint.expectedMessageCount(1);
solrUpdateEndPoint.expectedHeaderReceived(Exchange.HTTP_METHOD, "POST");

template.sendBodyAndHeaders("direct:update.solr", "",
createEvent(baseURL + fileID, eventTypes));
final var headers = createEvent(baseURL + fileID, eventTypes);
// Need to add the header as it is set in FcrepoSolrIndexer
headers.put("CamelIndexingTransformation", "org/fcrepo/camel/indexing/solr/default_transform.xsl");

template.sendBodyAndHeaders( "direct:update.solr", "", headers);

MockEndpoint.assertIsSatisfied(solrUpdateEndPoint);
}
Expand Down

0 comments on commit 40041f9

Please sign in to comment.