Skip to content

Commit

Permalink
Merge pull request #4927 from chimp1984/improve-shutdown-routine
Browse files Browse the repository at this point in the history
Improve shutdown routine
  • Loading branch information
ripcurlx authored Dec 10, 2020
2 parents 63fc486 + a2d2e6c commit aa0e091
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 24 deletions.
5 changes: 4 additions & 1 deletion core/src/main/java/bisq/core/app/BisqExecutable.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import bisq.core.dao.DaoSetup;
import bisq.core.dao.node.full.RpcService;
import bisq.core.offer.OpenOfferManager;
import bisq.core.provider.price.PriceFeedService;
import bisq.core.setup.CorePersistedDataHost;
import bisq.core.setup.CoreSetup;
import bisq.core.support.dispute.arbitration.arbitrator.ArbitratorManager;
Expand Down Expand Up @@ -227,12 +228,14 @@ public void gracefulShutDown(ResultHandler resultHandler) {
}

try {
injector.getInstance(PriceFeedService.class).shutDown();
injector.getInstance(ArbitratorManager.class).shutDown();
injector.getInstance(TradeStatisticsManager.class).shutDown();
injector.getInstance(XmrTxProofService.class).shutDown();
injector.getInstance(RpcService.class).shutDown();
injector.getInstance(DaoSetup.class).shutDown();
injector.getInstance(AvoidStandbyModeService.class).shutDown();
log.info("OpenOfferManager shutdown started");
injector.getInstance(OpenOfferManager.class).shutDown(() -> {
log.info("OpenOfferManager shutdown completed");

Expand Down Expand Up @@ -265,7 +268,7 @@ public void gracefulShutDown(ResultHandler resultHandler) {

// Wait max 20 sec.
UserThread.runAfter(() -> {
log.warn("Timeout triggered resultHandler");
log.warn("Graceful shut down not completed in 20 sec. We trigger our timeout handler.");
if (!hasDowngraded) {
// If user tried to downgrade we do not write the persistable data to avoid data corruption
PersistenceManager.flushAllDataToDisk(() -> {
Expand Down
1 change: 0 additions & 1 deletion core/src/main/java/bisq/core/offer/OfferBookService.java
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ public List<Offer> getOffers() {
}

public void removeOfferAtShutDown(OfferPayload offerPayload) {
log.debug("removeOfferAtShutDown " + offerPayload);
removeOffer(offerPayload, null, null);
}

Expand Down
8 changes: 6 additions & 2 deletions core/src/main/java/bisq/core/offer/OpenOfferManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,12 @@ public void shutDown(@Nullable Runnable completeHandler) {
UserThread.execute(() -> openOffers.forEach(
openOffer -> offerBookService.removeOfferAtShutDown(openOffer.getOffer().getOfferPayload())
));
if (completeHandler != null)
UserThread.runAfter(completeHandler, size * 200 + 500, TimeUnit.MILLISECONDS);
if (completeHandler != null) {
// For typical number of offers we are tolerant with delay to give enough time to broadcast.
// If number of offers is very high we limit to 3 sec. to not delay other shutdown routines.
int delay = Math.min(3000, size * 200 + 500);
UserThread.runAfter(completeHandler, delay, TimeUnit.MILLISECONDS);
}
} else {
if (completeHandler != null)
completeHandler.run();
Expand Down
28 changes: 20 additions & 8 deletions core/src/main/java/bisq/core/provider/price/PriceFeedService.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public class PriceFeedService {
private String baseUrlOfRespondingProvider;
@Nullable
private Timer requestTimer;
@Nullable
private PriceRequest priceRequest;


///////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -115,10 +117,20 @@ public PriceFeedService(@SuppressWarnings("SameParameterValue") PriceNodeHttpCli
// API
///////////////////////////////////////////////////////////////////////////////////////////

public void shutDown() {
if (requestTimer != null) {
requestTimer.stop();
requestTimer = null;
}
if (priceRequest != null) {
priceRequest.shutDown();
}
}

public void setCurrencyCodeOnInit() {
if (getCurrencyCode() == null) {
final TradeCurrency preferredTradeCurrency = preferences.getPreferredTradeCurrency();
final String code = preferredTradeCurrency != null ? preferredTradeCurrency.getCode() : "USD";
TradeCurrency preferredTradeCurrency = preferences.getPreferredTradeCurrency();
String code = preferredTradeCurrency != null ? preferredTradeCurrency.getCode() : "USD";
setCurrencyCode(code);
}
}
Expand Down Expand Up @@ -180,8 +192,8 @@ private void request(boolean repeatRequests) {
}
}, (errorMessage, throwable) -> {
if (throwable instanceof PriceRequestException) {
final String baseUrlOfFaultyRequest = ((PriceRequestException) throwable).priceProviderBaseUrl;
final String baseUrlOfCurrentRequest = priceProvider.getBaseUrl();
String baseUrlOfFaultyRequest = ((PriceRequestException) throwable).priceProviderBaseUrl;
String baseUrlOfCurrentRequest = priceProvider.getBaseUrl();
if (baseUrlOfFaultyRequest != null && baseUrlOfCurrentRequest.equals(baseUrlOfFaultyRequest)) {
log.warn("We received an error: baseUrlOfCurrentRequest={}, baseUrlOfFaultyRequest={}",
baseUrlOfCurrentRequest, baseUrlOfFaultyRequest);
Expand Down Expand Up @@ -223,7 +235,7 @@ private void retryWithNewProvider() {
UserThread.runAfter(() -> {
retryDelay = Math.min(retryDelay + 5, PERIOD_SEC);

final String oldBaseUrl = priceProvider.getBaseUrl();
String oldBaseUrl = priceProvider.getBaseUrl();
setNewPriceProvider();
log.warn("We received an error at the request from provider {}. " +
"We select the new provider {} and use that for a new request. retryDelay was {} sec.", oldBaseUrl, priceProvider.getBaseUrl(), retryDelay);
Expand Down Expand Up @@ -381,15 +393,15 @@ private boolean applyPriceToConsumer() {
}

private void requestAllPrices(PriceProvider provider, Runnable resultHandler, FaultHandler faultHandler) {
PriceRequest priceRequest = new PriceRequest();
priceRequest = new PriceRequest();
SettableFuture<Tuple2<Map<String, Long>, Map<String, MarketPrice>>> future = priceRequest.requestAllPrices(provider);
Futures.addCallback(future, new FutureCallback<Tuple2<Map<String, Long>, Map<String, MarketPrice>>>() {
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Tuple2<Map<String, Long>, Map<String, MarketPrice>> result) {
UserThread.execute(() -> {
checkNotNull(result, "Result must not be null at requestAllPrices");
// Each currency rate has a different timestamp, depending on when
// the pricenode aggregate rate was calculated
// the priceNode aggregate rate was calculated
// However, the request timestamp is when the pricenode was queried
epochInMillisAtLastRequest = System.currentTimeMillis();

Expand Down
17 changes: 14 additions & 3 deletions core/src/main/java/bisq/core/provider/price/PriceProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,18 @@
@Slf4j
public class PriceProvider extends HttpClientProvider {

private boolean shutDownRequested;

// Do not use Guice here as we might create multiple instances
public PriceProvider(HttpClient httpClient, String baseUrl) {
super(httpClient, baseUrl, false);
}

public Tuple2<Map<String, Long>, Map<String, MarketPrice>> getAll() throws IOException {
if (shutDownRequested) {
return new Tuple2<>(new HashMap<>(), new HashMap<>());
}

Map<String, MarketPrice> marketPriceMap = new HashMap<>();
String hsVersion = "";
if (P2PService.getMyNodeAddress() != null)
Expand All @@ -66,10 +72,10 @@ public Tuple2<Map<String, Long>, Map<String, MarketPrice>> getAll() throws IOExc
list.forEach(obj -> {
try {
LinkedTreeMap<?, ?> treeMap = (LinkedTreeMap<?, ?>) obj;
final String currencyCode = (String) treeMap.get("currencyCode");
final double price = (Double) treeMap.get("price");
String currencyCode = (String) treeMap.get("currencyCode");
double price = (Double) treeMap.get("price");
// json uses double for our timestampSec long value...
final long timestampSec = MathUtils.doubleToLong((Double) treeMap.get("timestampSec"));
long timestampSec = MathUtils.doubleToLong((Double) treeMap.get("timestampSec"));
marketPriceMap.put(currencyCode, new MarketPrice(currencyCode, price, timestampSec, true));
} catch (Throwable t) {
log.error(t.toString());
Expand All @@ -83,4 +89,9 @@ public Tuple2<Map<String, Long>, Map<String, MarketPrice>> getAll() throws IOExc
public String getBaseUrl() {
return httpClient.getBaseUrl();
}

public void shutDown() {
shutDownRequested = true;
httpClient.shutDown();
}
}
39 changes: 33 additions & 6 deletions core/src/main/java/bisq/core/provider/price/PriceRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,37 +28,64 @@
import com.google.common.util.concurrent.SettableFuture;

import java.util.Map;
import java.util.concurrent.TimeUnit;

import lombok.extern.slf4j.Slf4j;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

@Slf4j
public class PriceRequest {
private static final ListeningExecutorService executorService = Utilities.getListeningExecutorService("PriceRequest", 3, 5, 10 * 60);
@Nullable
private PriceProvider provider;
private boolean shutDownRequested;

public PriceRequest() {
}

public SettableFuture<Tuple2<Map<String, Long>, Map<String, MarketPrice>>> requestAllPrices(PriceProvider provider) {
final String baseUrl = provider.getBaseUrl();
final SettableFuture<Tuple2<Map<String, Long>, Map<String, MarketPrice>>> resultFuture = SettableFuture.create();
this.provider = provider;
String baseUrl = provider.getBaseUrl();
SettableFuture<Tuple2<Map<String, Long>, Map<String, MarketPrice>>> resultFuture = SettableFuture.create();
ListenableFuture<Tuple2<Map<String, Long>, Map<String, MarketPrice>>> future = executorService.submit(() -> {
Thread.currentThread().setName("PriceRequest-" + provider.getBaseUrl());
Thread.currentThread().setName("PriceRequest-" + baseUrl);
return provider.getAll();
});

Futures.addCallback(future, new FutureCallback<Tuple2<Map<String, Long>, Map<String, MarketPrice>>>() {
Futures.addCallback(future, new FutureCallback<>() {
public void onSuccess(Tuple2<Map<String, Long>, Map<String, MarketPrice>> marketPriceTuple) {
log.trace("Received marketPriceTuple of {}\nfrom provider {}", marketPriceTuple, provider);
resultFuture.set(marketPriceTuple);
if (!shutDownRequested) {
resultFuture.set(marketPriceTuple);
}

}

public void onFailure(@NotNull Throwable throwable) {
resultFuture.setException(new PriceRequestException(throwable, baseUrl));
if (!shutDownRequested) {
resultFuture.setException(new PriceRequestException(throwable, baseUrl));
}
}
}, MoreExecutors.directExecutor());

return resultFuture;
}

public void shutDown() {
shutDownRequested = true;
if (provider != null) {
provider.shutDown();
}

executorService.shutdown();
try {
if (!executorService.awaitTermination(1, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}
}
}
2 changes: 2 additions & 0 deletions p2p/src/main/java/bisq/network/http/HttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,6 @@ String requestWithGETNoProxy(String param,
String getUid();

String getBaseUrl();

void shutDown();
}
27 changes: 24 additions & 3 deletions p2p/src/main/java/bisq/network/http/HttpClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ public class HttpClientImpl implements HttpClient {
private String baseUrl;
private boolean ignoreSocks5Proxy;
private final String uid;
@Nullable
private HttpURLConnection connection;
@Nullable
private CloseableHttpClient httpclient;

@Inject
public HttpClientImpl(@Nullable Socks5ProxyProvider socks5ProxyProvider) {
Expand All @@ -76,6 +80,19 @@ public HttpClientImpl(String baseUrl) {
uid = UUID.randomUUID().toString();
}

@Override
public void shutDown() {
if (connection != null) {
connection.disconnect();
}
if (httpclient != null) {
try {
httpclient.close();
} catch (IOException ignore) {
}
}
}

@Override
public void setBaseUrl(String baseUrl) {
this.baseUrl = baseUrl;
Expand Down Expand Up @@ -117,7 +134,6 @@ public String requestWithGET(String param,
public String requestWithGETNoProxy(String param,
@Nullable String headerKey,
@Nullable String headerValue) throws IOException {
HttpURLConnection connection = null;
log.debug("Executing HTTP request " + baseUrl + param + " proxy: none.");
URL url = new URL(baseUrl + param);
try {
Expand Down Expand Up @@ -177,7 +193,8 @@ private String doRequestWithGETProxy(String param,
PoolingHttpClientConnectionManager cm = socks5Proxy.resolveAddrLocally() ?
new PoolingHttpClientConnectionManager(reg) :
new PoolingHttpClientConnectionManager(reg, new FakeDnsResolver());
try (CloseableHttpClient httpclient = HttpClients.custom().setConnectionManager(cm).build()) {
try {
httpclient = HttpClients.custom().setConnectionManager(cm).build();
InetSocketAddress socksAddress = new InetSocketAddress(socks5Proxy.getInetAddress(), socks5Proxy.getPort());

// remove me: Use this to test with system-wide Tor proxy, or change port for another proxy.
Expand All @@ -191,11 +208,15 @@ private String doRequestWithGETProxy(String param,
request.setHeader(headerKey, headerValue);

log.debug("Executing request " + request + " proxy: " + socksAddress);
try (CloseableHttpResponse response = httpclient.execute(request, context)) {
try (CloseableHttpResponse response = checkNotNull(httpclient).execute(request, context)) {
return convertInputStreamToString(response.getEntity().getContent());
}
} catch (Throwable t) {
throw new IOException("Error at requestWithGETProxy with URL: " + (baseUrl + param) + ". Throwable=" + t.getMessage());
} finally {
if (httpclient != null) {
httpclient.close();
}
}
}

Expand Down

0 comments on commit aa0e091

Please sign in to comment.