From 8bcee0665d7cb7e4c3be0c2ee11f977b2d06428a Mon Sep 17 00:00:00 2001 From: Yang Lei Date: Thu, 7 Nov 2019 13:20:08 -0500 Subject: [PATCH 01/17] rebase --- .../io/confluent/rest/ApplicationServer.java | 13 +++ .../java/io/confluent/rest/FileWatcher.java | 97 +++++++++++++++++++ .../java/io/confluent/rest/RestConfig.java | 10 ++ 3 files changed, 120 insertions(+) create mode 100644 core/src/main/java/io/confluent/rest/FileWatcher.java diff --git a/core/src/main/java/io/confluent/rest/ApplicationServer.java b/core/src/main/java/io/confluent/rest/ApplicationServer.java index 7d8a2f0d09..b34bb6d007 100644 --- a/core/src/main/java/io/confluent/rest/ApplicationServer.java +++ b/core/src/main/java/io/confluent/rest/ApplicationServer.java @@ -39,6 +39,8 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URL; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; @@ -250,6 +252,17 @@ private SslContextFactory createSslContextFactory(RestConfig config) { sslContextFactory.setKeyManagerFactoryAlgorithm( config.getString(RestConfig.SSL_KEYMANAGER_ALGORITHM_CONFIG)); } + + if (config.getBoolean(RestConfig.SSL_KEYSTORE_RELOAD_CONFIG)) { + Path keystorePath = Paths.get(RestConfig.SSL_KEYSTORE_LOCATION_CONFIG); + try { + FileWatcher.onFileChange(keystorePath, () -> + sslContextFactory.reload(scf -> log.info("Reloaded SSL cert"))); + log.info("Enabled SSL cert auto reload: " + keystorePath); + } catch (java.io.IOException e) { + log.error("Can not enabled SSL cert auto reload", e); + } + } } configureClientAuth(sslContextFactory, config); diff --git a/core/src/main/java/io/confluent/rest/FileWatcher.java b/core/src/main/java/io/confluent/rest/FileWatcher.java new file mode 100644 index 0000000000..c813935e14 --- /dev/null +++ b/core/src/main/java/io/confluent/rest/FileWatcher.java @@ -0,0 +1,97 @@ +/** + * Copyright 2019 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.rest; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchService; +import java.nio.file.WatchKey; +import java.nio.file.WatchEvent; + +// reference https://gist.github.com/danielflower/f54c2fe42d32356301c68860a4ab21ed +public class FileWatcher { + private static final Logger log = LoggerFactory.getLogger(FileWatcher.class); + + private Thread thread; + private WatchService watchService; + + public interface Callback { + void run() throws Exception; + } + + /** + * Starts watching a file and the given path and calls the callback when it is changed. + * A shutdown hook is registered to stop watching. To control this yourself, create an + * instance and use the start/stop methods. + */ + public static void onFileChange(Path file, Callback callback) throws IOException { + FileWatcher fileWatcher = new FileWatcher(); + fileWatcher.start(file, callback); + Runtime.getRuntime().addShutdownHook(new Thread(fileWatcher::stop)); + } + + public void start(Path file, Callback callback) throws IOException { + watchService = FileSystems.getDefault().newWatchService(); + Path parent = file.getParent(); + parent.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY); + log.info("Going to watch " + file); + + thread = new Thread(() -> { + while (true) { + WatchKey wk = null; + try { + wk = watchService.take(); + Thread.sleep(500); // give a chance for duplicate events to pile up + for (WatchEvent event : wk.pollEvents()) { + Path changed = parent.resolve((Path) event.context()); + if (Files.exists(changed) && Files.isSameFile(changed, file)) { + log.info("File change event: " + changed); + callback.run(); + break; + } + } + } catch (InterruptedException e) { + log.info("Ending my watch"); + Thread.currentThread().interrupt(); + break; + } catch (Exception e) { + log.error("Error while reloading cert", e); + } finally { + if (wk != null) { + wk.reset(); + } + } + } + }); + thread.start(); + } + + public void stop() { + thread.interrupt(); + try { + watchService.close(); + } catch (IOException e) { + log.info("Error closing watch service", e); + } + } +} \ No newline at end of file diff --git a/core/src/main/java/io/confluent/rest/RestConfig.java b/core/src/main/java/io/confluent/rest/RestConfig.java index 4908c44cd8..952e38676a 100644 --- a/core/src/main/java/io/confluent/rest/RestConfig.java +++ b/core/src/main/java/io/confluent/rest/RestConfig.java @@ -120,6 +120,10 @@ public class RestConfig extends AbstractConfig { + "details, etc."; protected static final String METRICS_TAGS_DEFAULT = ""; + public static final String SSL_KEYSTORE_RELOAD_CONFIG = "ssl.keystore.reload"; + protected static final String SSL_KEYSTORE_RELOAD_DOC = + "Enable auto reload of ssl key"; + protected static final boolean SSL_KEYSTORE_RELOAD_DEFAULT = false; public static final String SSL_KEYSTORE_LOCATION_CONFIG = "ssl.keystore.location"; protected static final String SSL_KEYSTORE_LOCATION_DOC = "Location of the keystore file to use for SSL. This is required for HTTPS."; @@ -411,6 +415,12 @@ private static ConfigDef incompleteBaseConfigDef() { METRICS_TAGS_DEFAULT, Importance.LOW, METRICS_TAGS_DOC + ).define( + SSL_KEYSTORE_RELOAD_CONFIG, + Type.BOOLEAN, + SSL_KEYSTORE_RELOAD_DEFAULT, + Importance.LOW, + SSL_KEYSTORE_RELOAD_DOC ).define( SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, From 03e49741c7aae35832a1722e0f418cb45e4411d4 Mon Sep 17 00:00:00 2001 From: Yang Lei Date: Mon, 4 Nov 2019 16:10:21 -0500 Subject: [PATCH 02/17] revise auto reload --- .../java/io/confluent/rest/RestConfig.java | 2 +- .../test/java/io/confluent/rest/SslTest.java | 22 +++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/io/confluent/rest/RestConfig.java b/core/src/main/java/io/confluent/rest/RestConfig.java index 952e38676a..06a4c037d1 100644 --- a/core/src/main/java/io/confluent/rest/RestConfig.java +++ b/core/src/main/java/io/confluent/rest/RestConfig.java @@ -122,7 +122,7 @@ public class RestConfig extends AbstractConfig { public static final String SSL_KEYSTORE_RELOAD_CONFIG = "ssl.keystore.reload"; protected static final String SSL_KEYSTORE_RELOAD_DOC = - "Enable auto reload of ssl key"; + "Enable auto reload of ssl keystore"; protected static final boolean SSL_KEYSTORE_RELOAD_DEFAULT = false; public static final String SSL_KEYSTORE_LOCATION_CONFIG = "ssl.keystore.location"; protected static final String SSL_KEYSTORE_LOCATION_DOC = diff --git a/core/src/test/java/io/confluent/rest/SslTest.java b/core/src/test/java/io/confluent/rest/SslTest.java index afb39a8a7a..2d804e59ae 100644 --- a/core/src/test/java/io/confluent/rest/SslTest.java +++ b/core/src/test/java/io/confluent/rest/SslTest.java @@ -134,6 +134,28 @@ public void testHttpAndHttps() throws Exception { } } + @Test + public void testHttpsWithAutoReload() throws Exception { + TestMetricsReporter.reset(); + Properties props = new Properties(); + String httpsUri = "https://localhost:8081"; + props.put(RestConfig.LISTENERS_CONFIG, httpsUri); + props.put(RestConfig.METRICS_REPORTER_CLASSES_CONFIG, "io.confluent.rest.TestMetricsReporter"); + props.put(RestConfig.SSL_KEYSTORE_RELOAD_CONFIG, "true"); + configServerKeystore(props); + TestRestConfig config = new TestRestConfig(props); + SslTestApplication app = new SslTestApplication(config); + try { + app.start(); + int statusCode = makeGetRequest(httpsUri + "/test", + clientKeystore.getAbsolutePath(), SSL_PASSWORD, SSL_PASSWORD); + assertEquals(EXPECTED_200_MSG, 200, statusCode); + //assertMetricsCollected(); + } finally { + app.stop(); + } + } + @Test(expected = ClientProtocolException.class) public void testHttpsOnly() throws Exception { TestMetricsReporter.reset(); From d497b1c1423e6d0239a1f8584e3b4d9d38ab97b1 Mon Sep 17 00:00:00 2001 From: Yang Lei Date: Mon, 4 Nov 2019 23:39:22 -0500 Subject: [PATCH 03/17] test case --- .../java/io/confluent/rest/FileWatcher.java | 100 ++++++++++-------- .../test/java/io/confluent/rest/SslTest.java | 46 +++++++- 2 files changed, 101 insertions(+), 45 deletions(-) diff --git a/core/src/main/java/io/confluent/rest/FileWatcher.java b/core/src/main/java/io/confluent/rest/FileWatcher.java index c813935e14..144173897b 100644 --- a/core/src/main/java/io/confluent/rest/FileWatcher.java +++ b/core/src/main/java/io/confluent/rest/FileWatcher.java @@ -28,70 +28,84 @@ import java.nio.file.WatchKey; import java.nio.file.WatchEvent; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + // reference https://gist.github.com/danielflower/f54c2fe42d32356301c68860a4ab21ed -public class FileWatcher { +public class FileWatcher implements Runnable { private static final Logger log = LoggerFactory.getLogger(FileWatcher.class); - private Thread thread; - private WatchService watchService; - public interface Callback { void run() throws Exception; } + private final WatchService watchService; + private final Path file; + private final WatchKey key; + private final Callback callback; + + public FileWatcher(Path file, Callback callback) throws IOException { + this.file = file; + this.watchService = FileSystems.getDefault().newWatchService(); + this.key = file.getParent().register(watchService, StandardWatchEventKinds.ENTRY_MODIFY); + this.callback = callback; + } + /** * Starts watching a file and the given path and calls the callback when it is changed. * A shutdown hook is registered to stop watching. To control this yourself, create an * instance and use the start/stop methods. */ public static void onFileChange(Path file, Callback callback) throws IOException { - FileWatcher fileWatcher = new FileWatcher(); - fileWatcher.start(file, callback); - Runtime.getRuntime().addShutdownHook(new Thread(fileWatcher::stop)); + log.info("Configure watch file change: " + file); + FileWatcher fileWatcher = new FileWatcher(file, callback); + ExecutorService executor = Executors.newSingleThreadExecutor(); + Future future = executor.submit(fileWatcher); + Runtime.getRuntime().addShutdownHook(new Thread(executor::shutdown)); } - public void start(Path file, Callback callback) throws IOException { - watchService = FileSystems.getDefault().newWatchService(); - Path parent = file.getParent(); - parent.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY); - log.info("Going to watch " + file); - - thread = new Thread(() -> { - while (true) { - WatchKey wk = null; - try { - wk = watchService.take(); - Thread.sleep(500); // give a chance for duplicate events to pile up - for (WatchEvent event : wk.pollEvents()) { - Path changed = parent.resolve((Path) event.context()); - if (Files.exists(changed) && Files.isSameFile(changed, file)) { - log.info("File change event: " + changed); - callback.run(); + public void run() { + try { + for (;;) { + log.info("Watching file change: " + file); + // wait for key to be signalled + WatchKey key = watchService.take(); + if (this.key != key) { + log.info("WatchKey not recognized"); + continue; + } + log.info("Watch Key notified"); + for (WatchEvent event : key.pollEvents()) { + WatchEvent.Kind kind = event.kind(); + if (kind != StandardWatchEventKinds.ENTRY_MODIFY) { + log.info("Watch event is not modified."); + continue; + } + WatchEvent ev = (WatchEvent)event; + Path changed = this.file.getParent().resolve(ev.context()); + try { + if (Files.isSameFile(changed, this.file)) { + log.info("Watch found file: " + file); + try { + callback.run(); + } catch (Exception e) { + log.error("Error while reloading cert", e); + } break; } + continue; + } catch (java.io.IOException e) { + log.info("Hit error process the change event", e); } - } catch (InterruptedException e) { - log.info("Ending my watch"); - Thread.currentThread().interrupt(); + } + // reset key + if (!key.reset()) { break; - } catch (Exception e) { - log.error("Error while reloading cert", e); - } finally { - if (wk != null) { - wk.reset(); - } } } - }); - thread.start(); - } - - public void stop() { - thread.interrupt(); - try { - watchService.close(); - } catch (IOException e) { - log.info("Error closing watch service", e); + } catch (InterruptedException e) { + log.info("Ending my watch"); } } } \ No newline at end of file diff --git a/core/src/test/java/io/confluent/rest/SslTest.java b/core/src/test/java/io/confluent/rest/SslTest.java index 2d804e59ae..c7c35abe96 100644 --- a/core/src/test/java/io/confluent/rest/SslTest.java +++ b/core/src/test/java/io/confluent/rest/SslTest.java @@ -37,6 +37,8 @@ import java.io.File; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; import java.net.SocketException; import java.security.KeyPair; import java.security.cert.X509Certificate; @@ -109,6 +111,16 @@ private void enableSslClientAuth(Properties props) { props.put(RestConfig.SSL_CLIENT_AUTH_CONFIG, true); } + private void createWrongKeystoreWithCert(File file, String alias, Map certs) throws Exception { + KeyPair keypair = TestSslUtils.generateKeyPair("RSA"); + CertificateBuilder certificateBuilder = new CertificateBuilder(30, "SHA1withRSA"); + X509Certificate cCert = certificateBuilder.sanDnsName("fail") + .generate("CN=mymachine.local, O=A client", keypair); + TestSslUtils.createKeyStore(file.getPath(), new Password(SSL_PASSWORD), alias, keypair.getPrivate(), cCert); + certs.put(alias, cCert); + } + + @Test public void testHttpAndHttps() throws Exception { TestMetricsReporter.reset(); @@ -138,7 +150,7 @@ public void testHttpAndHttps() throws Exception { public void testHttpsWithAutoReload() throws Exception { TestMetricsReporter.reset(); Properties props = new Properties(); - String httpsUri = "https://localhost:8081"; + String httpsUri = "https://localhost:8082"; props.put(RestConfig.LISTENERS_CONFIG, httpsUri); props.put(RestConfig.METRICS_REPORTER_CLASSES_CONFIG, "io.confluent.rest.TestMetricsReporter"); props.put(RestConfig.SSL_KEYSTORE_RELOAD_CONFIG, "true"); @@ -150,7 +162,37 @@ public void testHttpsWithAutoReload() throws Exception { int statusCode = makeGetRequest(httpsUri + "/test", clientKeystore.getAbsolutePath(), SSL_PASSWORD, SSL_PASSWORD); assertEquals(EXPECTED_200_MSG, 200, statusCode); - //assertMetricsCollected(); + assertMetricsCollected(); + + // verify reload -- override the server keystore with a new one + File serverKeystoreBak = File.createTempFile("SslTest-server-keystore", ".jks.bak"); + Files.copy(serverKeystore.toPath(), serverKeystoreBak.toPath(), StandardCopyOption.REPLACE_EXISTING); + File serverKeystoreNew; + try { + serverKeystoreNew = File.createTempFile("SslTest-server-keystore", ".jks.new"); + } catch (IOException ioe) { + throw new RuntimeException("Unable to create temporary files for trust stores and keystores."); + } + Map certs = new HashMap<>(); + createWrongKeystoreWithCert(serverKeystoreNew, "server", certs); + Files.copy(serverKeystoreNew.toPath(), serverKeystore.toPath(), StandardCopyOption.REPLACE_EXISTING); + + Thread.sleep(10000); // 5s is too short for auto reload + boolean hitError = false; + try { + makeGetRequest(httpsUri + "/test", + clientKeystore.getAbsolutePath(), SSL_PASSWORD, SSL_PASSWORD); + } catch (Exception e) { + System.out.println(e); + hitError = true; + } + + Files.copy(serverKeystoreBak.toPath(), serverKeystore.toPath(), StandardCopyOption.REPLACE_EXISTING); + Thread.sleep(10000); // 5s is too short for auto reload + statusCode = makeGetRequest(httpsUri + "/test", + clientKeystore.getAbsolutePath(), SSL_PASSWORD, SSL_PASSWORD); + assertEquals(EXPECTED_200_MSG, 200, statusCode); + assertEquals("expect hit error with new server cert", true, hitError); } finally { app.stop(); } From 66cc34f9ee5303d20b4e98a24774e62188295369 Mon Sep 17 00:00:00 2001 From: Yang Lei Date: Tue, 5 Nov 2019 00:16:27 -0500 Subject: [PATCH 04/17] revise parameter --- core/src/main/java/io/confluent/rest/FileWatcher.java | 1 - core/src/test/java/io/confluent/rest/SslTest.java | 5 +++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/io/confluent/rest/FileWatcher.java b/core/src/main/java/io/confluent/rest/FileWatcher.java index 144173897b..1c6a37feb3 100644 --- a/core/src/main/java/io/confluent/rest/FileWatcher.java +++ b/core/src/main/java/io/confluent/rest/FileWatcher.java @@ -94,7 +94,6 @@ public void run() { } break; } - continue; } catch (java.io.IOException e) { log.info("Hit error process the change event", e); } diff --git a/core/src/test/java/io/confluent/rest/SslTest.java b/core/src/test/java/io/confluent/rest/SslTest.java index c7c35abe96..2e6076bf52 100644 --- a/core/src/test/java/io/confluent/rest/SslTest.java +++ b/core/src/test/java/io/confluent/rest/SslTest.java @@ -71,6 +71,7 @@ public class SslTest { public static final String SSL_PASSWORD = "test1234"; public static final String EXPECTED_200_MSG = "Response status must be 200."; + public static final int CERT_RELOAD_WAIT_TIME = 20000; @Before public void setUp() throws Exception { @@ -177,7 +178,7 @@ public void testHttpsWithAutoReload() throws Exception { createWrongKeystoreWithCert(serverKeystoreNew, "server", certs); Files.copy(serverKeystoreNew.toPath(), serverKeystore.toPath(), StandardCopyOption.REPLACE_EXISTING); - Thread.sleep(10000); // 5s is too short for auto reload + Thread.sleep(CERT_RELOAD_WAIT_TIME); boolean hitError = false; try { makeGetRequest(httpsUri + "/test", @@ -188,7 +189,7 @@ public void testHttpsWithAutoReload() throws Exception { } Files.copy(serverKeystoreBak.toPath(), serverKeystore.toPath(), StandardCopyOption.REPLACE_EXISTING); - Thread.sleep(10000); // 5s is too short for auto reload + Thread.sleep(CERT_RELOAD_WAIT_TIME); statusCode = makeGetRequest(httpsUri + "/test", clientKeystore.getAbsolutePath(), SSL_PASSWORD, SSL_PASSWORD); assertEquals(EXPECTED_200_MSG, 200, statusCode); From 9866b0a04e91f2dd1d9dac7a2117f05ecee6eb5c Mon Sep 17 00:00:00 2001 From: Yang Lei Date: Tue, 5 Nov 2019 10:39:55 -0500 Subject: [PATCH 05/17] revise logging --- .../java/io/confluent/rest/FileWatcher.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/io/confluent/rest/FileWatcher.java b/core/src/main/java/io/confluent/rest/FileWatcher.java index 1c6a37feb3..59343df3b3 100644 --- a/core/src/main/java/io/confluent/rest/FileWatcher.java +++ b/core/src/main/java/io/confluent/rest/FileWatcher.java @@ -68,43 +68,44 @@ public static void onFileChange(Path file, Callback callback) throws IOException public void run() { try { for (;;) { - log.info("Watching file change: " + file); + log.debug("Watching file change: " + file); // wait for key to be signalled WatchKey key = watchService.take(); if (this.key != key) { - log.info("WatchKey not recognized"); + log.debug("WatchKey not recognized"); continue; } log.info("Watch Key notified"); for (WatchEvent event : key.pollEvents()) { WatchEvent.Kind kind = event.kind(); - if (kind != StandardWatchEventKinds.ENTRY_MODIFY) { - log.info("Watch event is not modified."); + if (kind == StandardWatchEventKinds.OVERFLOW) { + log.debug("Watch event is OVERFLOW"); continue; } WatchEvent ev = (WatchEvent)event; Path changed = this.file.getParent().resolve(ev.context()); try { if (Files.isSameFile(changed, this.file)) { - log.info("Watch found file: " + file); + log.debug("Watch found matching file: " + file); try { callback.run(); } catch (Exception e) { - log.error("Error while reloading cert", e); + log.warn("Hit error callback on file change", e); } break; } } catch (java.io.IOException e) { - log.info("Hit error process the change event", e); + log.warn("Hit error process the change event", e); } } // reset key if (!key.reset()) { + log.warn("Ending watch due to key reset error"); break; } } } catch (InterruptedException e) { - log.info("Ending my watch"); + log.info("Ending watch due to interrupt"); } } } \ No newline at end of file From 843904657c63879af0d45d6fad2efa1f68d75db4 Mon Sep 17 00:00:00 2001 From: Yang Lei Date: Tue, 5 Nov 2019 20:58:18 -0500 Subject: [PATCH 06/17] shutdown logic --- .../java/io/confluent/rest/FileWatcher.java | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/io/confluent/rest/FileWatcher.java b/core/src/main/java/io/confluent/rest/FileWatcher.java index 59343df3b3..f583926562 100644 --- a/core/src/main/java/io/confluent/rest/FileWatcher.java +++ b/core/src/main/java/io/confluent/rest/FileWatcher.java @@ -40,6 +40,7 @@ public interface Callback { void run() throws Exception; } + private volatile boolean shutdown; private final WatchService watchService; private final Path file; private final WatchKey key; @@ -53,7 +54,7 @@ public FileWatcher(Path file, Callback callback) throws IOException { } /** - * Starts watching a file and the given path and calls the callback when it is changed. + * Starts watching a file calls the callback when it is changed. * A shutdown hook is registered to stop watching. To control this yourself, create an * instance and use the start/stop methods. */ @@ -62,12 +63,12 @@ public static void onFileChange(Path file, Callback callback) throws IOException FileWatcher fileWatcher = new FileWatcher(file, callback); ExecutorService executor = Executors.newSingleThreadExecutor(); Future future = executor.submit(fileWatcher); - Runtime.getRuntime().addShutdownHook(new Thread(executor::shutdown)); + Runtime.getRuntime().addShutdownHook(new Thread(executor::shutdownNow)); } public void run() { try { - for (;;) { + while (!shutdown) { log.debug("Watching file change: " + file); // wait for key to be signalled WatchKey key = watchService.take(); @@ -108,4 +109,14 @@ public void run() { log.info("Ending watch due to interrupt"); } } + + public void shutdown() { + shutdown = true; + try { + watchService.close(); + } catch (IOException e) { + log.info("Error closing watch service", e); + } + } + } \ No newline at end of file From 1a237ab0013bee3a4392560756d4d031a33a7f03 Mon Sep 17 00:00:00 2001 From: Yang Lei Date: Tue, 5 Nov 2019 22:55:07 -0500 Subject: [PATCH 07/17] take care of delete then create --- core/src/main/java/io/confluent/rest/FileWatcher.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/io/confluent/rest/FileWatcher.java b/core/src/main/java/io/confluent/rest/FileWatcher.java index f583926562..80df71a1d2 100644 --- a/core/src/main/java/io/confluent/rest/FileWatcher.java +++ b/core/src/main/java/io/confluent/rest/FileWatcher.java @@ -49,7 +49,10 @@ public interface Callback { public FileWatcher(Path file, Callback callback) throws IOException { this.file = file; this.watchService = FileSystems.getDefault().newWatchService(); - this.key = file.getParent().register(watchService, StandardWatchEventKinds.ENTRY_MODIFY); + // Listen to both CREATE and MODIFY to reload, so taking care of delete then create. + this.key = file.getParent().register(watchService, + StandardWatchEventKinds.ENTRY_CREATE, + StandardWatchEventKinds.ENTRY_MODIFY); this.callback = callback; } From b974c27ed59c49764226002f11f7a6d684959f4d Mon Sep 17 00:00:00 2001 From: Yang Lei Date: Wed, 6 Nov 2019 00:16:58 -0500 Subject: [PATCH 08/17] revise test --- .../test/java/io/confluent/rest/SslTest.java | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/core/src/test/java/io/confluent/rest/SslTest.java b/core/src/test/java/io/confluent/rest/SslTest.java index 2e6076bf52..5ccedba7e5 100644 --- a/core/src/test/java/io/confluent/rest/SslTest.java +++ b/core/src/test/java/io/confluent/rest/SslTest.java @@ -68,6 +68,8 @@ public class SslTest { private File trustStore; private File clientKeystore; private File serverKeystore; + private File serverKeystoreBak; + private File serverKeystoreErr; public static final String SSL_PASSWORD = "test1234"; public static final String EXPECTED_200_MSG = "Response status must be 200."; @@ -79,6 +81,8 @@ public void setUp() throws Exception { trustStore = File.createTempFile("SslTest-truststore", ".jks"); clientKeystore = File.createTempFile("SslTest-client-keystore", ".jks"); serverKeystore = File.createTempFile("SslTest-server-keystore", ".jks"); + serverKeystoreBak = File.createTempFile("SslTest-server-keystore", ".jks.bak"); + serverKeystoreErr = File.createTempFile("SslTest-server-keystore", ".jks.err"); } catch (IOException ioe) { throw new RuntimeException("Unable to create temporary files for trust stores and keystores."); } @@ -86,6 +90,10 @@ public void setUp() throws Exception { createKeystoreWithCert(clientKeystore, "client", certs); createKeystoreWithCert(serverKeystore, "server", certs); TestSslUtils.createTrustStore(trustStore.getAbsolutePath(), new Password(SSL_PASSWORD), certs); + + Files.copy(serverKeystore.toPath(), serverKeystoreBak.toPath(), StandardCopyOption.REPLACE_EXISTING); + certs = new HashMap<>(); + createWrongKeystoreWithCert(serverKeystoreErr, "server", certs); } private void createKeystoreWithCert(File file, String alias, Map certs) throws Exception { @@ -165,19 +173,8 @@ public void testHttpsWithAutoReload() throws Exception { assertEquals(EXPECTED_200_MSG, 200, statusCode); assertMetricsCollected(); - // verify reload -- override the server keystore with a new one - File serverKeystoreBak = File.createTempFile("SslTest-server-keystore", ".jks.bak"); - Files.copy(serverKeystore.toPath(), serverKeystoreBak.toPath(), StandardCopyOption.REPLACE_EXISTING); - File serverKeystoreNew; - try { - serverKeystoreNew = File.createTempFile("SslTest-server-keystore", ".jks.new"); - } catch (IOException ioe) { - throw new RuntimeException("Unable to create temporary files for trust stores and keystores."); - } - Map certs = new HashMap<>(); - createWrongKeystoreWithCert(serverKeystoreNew, "server", certs); - Files.copy(serverKeystoreNew.toPath(), serverKeystore.toPath(), StandardCopyOption.REPLACE_EXISTING); - + // verify reload -- override the server keystore with a wrong one + Files.copy(serverKeystoreErr.toPath(), serverKeystore.toPath(), StandardCopyOption.REPLACE_EXISTING); Thread.sleep(CERT_RELOAD_WAIT_TIME); boolean hitError = false; try { @@ -188,6 +185,7 @@ public void testHttpsWithAutoReload() throws Exception { hitError = true; } + // verify reload -- override the server keystore with a correct one Files.copy(serverKeystoreBak.toPath(), serverKeystore.toPath(), StandardCopyOption.REPLACE_EXISTING); Thread.sleep(CERT_RELOAD_WAIT_TIME); statusCode = makeGetRequest(httpsUri + "/test", From a166704380db5711bf3cc87ba66ad8d1f244c27e Mon Sep 17 00:00:00 2001 From: Yang Lei Date: Wed, 6 Nov 2019 11:50:16 -0500 Subject: [PATCH 09/17] revise doc --- core/src/main/java/io/confluent/rest/FileWatcher.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/java/io/confluent/rest/FileWatcher.java b/core/src/main/java/io/confluent/rest/FileWatcher.java index 80df71a1d2..ad45adc277 100644 --- a/core/src/main/java/io/confluent/rest/FileWatcher.java +++ b/core/src/main/java/io/confluent/rest/FileWatcher.java @@ -58,8 +58,7 @@ public FileWatcher(Path file, Callback callback) throws IOException { /** * Starts watching a file calls the callback when it is changed. - * A shutdown hook is registered to stop watching. To control this yourself, create an - * instance and use the start/stop methods. + * A shutdown hook is registered to stop watching. */ public static void onFileChange(Path file, Callback callback) throws IOException { log.info("Configure watch file change: " + file); From 8665ed4a4fffd4ef76360e8863a013b0f7e4cf0c Mon Sep 17 00:00:00 2001 From: Yang Lei Date: Wed, 6 Nov 2019 21:22:30 -0500 Subject: [PATCH 10/17] revise port --- core/src/test/java/io/confluent/rest/SslTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/io/confluent/rest/SslTest.java b/core/src/test/java/io/confluent/rest/SslTest.java index 5ccedba7e5..84eb684979 100644 --- a/core/src/test/java/io/confluent/rest/SslTest.java +++ b/core/src/test/java/io/confluent/rest/SslTest.java @@ -272,7 +272,7 @@ public void testHttpsWithNoClientCertAndNoServerTruststore() throws Exception { @Test(expected = SocketException.class) public void testHttpsWithAuthAndBadClientCert() throws Exception { Properties props = new Properties(); - String uri = "https://localhost:8080"; + String uri = "https://localhost:8084"; props.put(RestConfig.LISTENERS_CONFIG, uri); configServerKeystore(props); configServerTruststore(props); From 169115a9e7f001e099e2f373958bc3f6b158f891 Mon Sep 17 00:00:00 2001 From: Yang Lei Date: Thu, 7 Nov 2019 13:33:38 -0500 Subject: [PATCH 11/17] fix issue --- core/src/main/java/io/confluent/rest/ApplicationServer.java | 2 +- core/src/main/java/io/confluent/rest/FileWatcher.java | 3 ++- core/src/test/java/io/confluent/rest/SslTest.java | 4 +++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/io/confluent/rest/ApplicationServer.java b/core/src/main/java/io/confluent/rest/ApplicationServer.java index b34bb6d007..d772e3a2e9 100644 --- a/core/src/main/java/io/confluent/rest/ApplicationServer.java +++ b/core/src/main/java/io/confluent/rest/ApplicationServer.java @@ -254,7 +254,7 @@ private SslContextFactory createSslContextFactory(RestConfig config) { } if (config.getBoolean(RestConfig.SSL_KEYSTORE_RELOAD_CONFIG)) { - Path keystorePath = Paths.get(RestConfig.SSL_KEYSTORE_LOCATION_CONFIG); + Path keystorePath = Paths.get(config.getString(RestConfig.SSL_KEYSTORE_LOCATION_CONFIG)); try { FileWatcher.onFileChange(keystorePath, () -> sslContextFactory.reload(scf -> log.info("Reloaded SSL cert"))); diff --git a/core/src/main/java/io/confluent/rest/FileWatcher.java b/core/src/main/java/io/confluent/rest/FileWatcher.java index ad45adc277..361773ec05 100644 --- a/core/src/main/java/io/confluent/rest/FileWatcher.java +++ b/core/src/main/java/io/confluent/rest/FileWatcher.java @@ -87,9 +87,10 @@ public void run() { } WatchEvent ev = (WatchEvent)event; Path changed = this.file.getParent().resolve(ev.context()); + log.debug("Watch file change: " + changed); try { if (Files.isSameFile(changed, this.file)) { - log.debug("Watch found matching file: " + file); + log.debug("Watch matching file: " + file); try { callback.run(); } catch (Exception e) { diff --git a/core/src/test/java/io/confluent/rest/SslTest.java b/core/src/test/java/io/confluent/rest/SslTest.java index 84eb684979..d574129c46 100644 --- a/core/src/test/java/io/confluent/rest/SslTest.java +++ b/core/src/test/java/io/confluent/rest/SslTest.java @@ -193,7 +193,9 @@ public void testHttpsWithAutoReload() throws Exception { assertEquals(EXPECTED_200_MSG, 200, statusCode); assertEquals("expect hit error with new server cert", true, hitError); } finally { - app.stop(); + if (app != null) { + app.stop(); + } } } From 5d6380ca16947b256814f9bd577098c2c0467a6d Mon Sep 17 00:00:00 2001 From: Yang Lei Date: Thu, 7 Nov 2019 13:39:17 -0500 Subject: [PATCH 12/17] revise test --- core/src/test/java/io/confluent/rest/SslTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/io/confluent/rest/SslTest.java b/core/src/test/java/io/confluent/rest/SslTest.java index d574129c46..3c70a4a2ea 100644 --- a/core/src/test/java/io/confluent/rest/SslTest.java +++ b/core/src/test/java/io/confluent/rest/SslTest.java @@ -274,7 +274,7 @@ public void testHttpsWithNoClientCertAndNoServerTruststore() throws Exception { @Test(expected = SocketException.class) public void testHttpsWithAuthAndBadClientCert() throws Exception { Properties props = new Properties(); - String uri = "https://localhost:8084"; + String uri = "https://localhost:8080"; props.put(RestConfig.LISTENERS_CONFIG, uri); configServerKeystore(props); configServerTruststore(props); From 3d43ea72ca8d275d60bc6be8386fd522952bbf2e Mon Sep 17 00:00:00 2001 From: Yang Lei Date: Thu, 7 Nov 2019 13:41:11 -0500 Subject: [PATCH 13/17] revise style --- core/src/main/java/io/confluent/rest/FileWatcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/io/confluent/rest/FileWatcher.java b/core/src/main/java/io/confluent/rest/FileWatcher.java index 361773ec05..b67311a31d 100644 --- a/core/src/main/java/io/confluent/rest/FileWatcher.java +++ b/core/src/main/java/io/confluent/rest/FileWatcher.java @@ -122,4 +122,4 @@ public void shutdown() { } } -} \ No newline at end of file +} From d67818695afc82130d28989418bb0c2e901c2b44 Mon Sep 17 00:00:00 2001 From: Yang Lei Date: Tue, 12 Nov 2019 23:44:54 -0500 Subject: [PATCH 14/17] handle k8s secret symbolic link --- .../io/confluent/rest/ApplicationServer.java | 23 ++++++++++--- .../java/io/confluent/rest/FileWatcher.java | 33 +++++++------------ .../java/io/confluent/rest/RestConfig.java | 10 ++++++ 3 files changed, 40 insertions(+), 26 deletions(-) diff --git a/core/src/main/java/io/confluent/rest/ApplicationServer.java b/core/src/main/java/io/confluent/rest/ApplicationServer.java index d772e3a2e9..1cacfe0cda 100644 --- a/core/src/main/java/io/confluent/rest/ApplicationServer.java +++ b/core/src/main/java/io/confluent/rest/ApplicationServer.java @@ -232,6 +232,15 @@ private void configureClientAuth(SslContextFactory sslContextFactory, RestConfig } } + private Path getWatchLocation(RestConfig config) { + Path keystorePath = Paths.get(config.getString(RestConfig.SSL_KEYSTORE_LOCATION_CONFIG)); + String watchLocation = config.getString(RestConfig.SSL_KEYSTORE_WATCH_LOCATION_CONFIG); + if (!watchLocation.isEmpty()) { + keystorePath = Paths.get(watchLocation); + } + return keystorePath; + } + private SslContextFactory createSslContextFactory(RestConfig config) { SslContextFactory sslContextFactory = new SslContextFactory.Server(); if (!config.getString(RestConfig.SSL_KEYSTORE_LOCATION_CONFIG).isEmpty()) { @@ -254,11 +263,17 @@ private SslContextFactory createSslContextFactory(RestConfig config) { } if (config.getBoolean(RestConfig.SSL_KEYSTORE_RELOAD_CONFIG)) { - Path keystorePath = Paths.get(config.getString(RestConfig.SSL_KEYSTORE_LOCATION_CONFIG)); + Path watchLocation = getWatchLocation(config); try { - FileWatcher.onFileChange(keystorePath, () -> - sslContextFactory.reload(scf -> log.info("Reloaded SSL cert"))); - log.info("Enabled SSL cert auto reload: " + keystorePath); + FileWatcher.onFileChange(watchLocation, () -> { + // Need to reset the key store path for symbolic link case + sslContextFactory.setKeyStorePath( + config.getString(RestConfig.SSL_KEYSTORE_LOCATION_CONFIG) + ); + sslContextFactory.reload(scf -> log.info("Reloaded SSL cert")); + } + ); + log.info("Enabled SSL cert auto reload for: " + watchLocation); } catch (java.io.IOException e) { log.error("Can not enabled SSL cert auto reload", e); } diff --git a/core/src/main/java/io/confluent/rest/FileWatcher.java b/core/src/main/java/io/confluent/rest/FileWatcher.java index b67311a31d..5be0116953 100644 --- a/core/src/main/java/io/confluent/rest/FileWatcher.java +++ b/core/src/main/java/io/confluent/rest/FileWatcher.java @@ -43,14 +43,13 @@ public interface Callback { private volatile boolean shutdown; private final WatchService watchService; private final Path file; - private final WatchKey key; private final Callback callback; public FileWatcher(Path file, Callback callback) throws IOException { this.file = file; this.watchService = FileSystems.getDefault().newWatchService(); // Listen to both CREATE and MODIFY to reload, so taking care of delete then create. - this.key = file.getParent().register(watchService, + file.getParent().register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_MODIFY); this.callback = callback; @@ -74,10 +73,6 @@ public void run() { log.debug("Watching file change: " + file); // wait for key to be signalled WatchKey key = watchService.take(); - if (this.key != key) { - log.debug("WatchKey not recognized"); - continue; - } log.info("Watch Key notified"); for (WatchEvent event : key.pollEvents()) { WatchEvent.Kind kind = event.kind(); @@ -87,26 +82,20 @@ public void run() { } WatchEvent ev = (WatchEvent)event; Path changed = this.file.getParent().resolve(ev.context()); - log.debug("Watch file change: " + changed); - try { - if (Files.isSameFile(changed, this.file)) { - log.debug("Watch matching file: " + file); - try { - callback.run(); - } catch (Exception e) { - log.warn("Hit error callback on file change", e); - } - break; + log.info("Watch file change: " + ev.context() + "=>" + changed); + // Need to use path equals than isSameFile + if (Files.exists(changed) && changed.equals(this.file)) { + log.debug("Watch matching file: " + file); + try { + callback.run(); + } catch (Exception e) { + log.warn("Hit error callback on file change", e); } - } catch (java.io.IOException e) { - log.warn("Hit error process the change event", e); + break; } } // reset key - if (!key.reset()) { - log.warn("Ending watch due to key reset error"); - break; - } + key.reset(); } } catch (InterruptedException e) { log.info("Ending watch due to interrupt"); diff --git a/core/src/main/java/io/confluent/rest/RestConfig.java b/core/src/main/java/io/confluent/rest/RestConfig.java index 06a4c037d1..425c648693 100644 --- a/core/src/main/java/io/confluent/rest/RestConfig.java +++ b/core/src/main/java/io/confluent/rest/RestConfig.java @@ -128,6 +128,10 @@ public class RestConfig extends AbstractConfig { protected static final String SSL_KEYSTORE_LOCATION_DOC = "Location of the keystore file to use for SSL. This is required for HTTPS."; protected static final String SSL_KEYSTORE_LOCATION_DEFAULT = ""; + public static final String SSL_KEYSTORE_WATCH_LOCATION_CONFIG = "ssl.keystore.watch.location"; + protected static final String SSL_KEYSTORE_WATCH_LOCATION_DOC = + "Location to watch keystore file change if it is different from keystore location "; + protected static final String SSL_KEYSTORE_WATCH_LOCATION_DEFAULT = ""; public static final String SSL_KEYSTORE_PASSWORD_CONFIG = "ssl.keystore.password"; protected static final String SSL_KEYSTORE_PASSWORD_DOC = "The store password for the keystore file."; @@ -427,6 +431,12 @@ private static ConfigDef incompleteBaseConfigDef() { SSL_KEYSTORE_LOCATION_DEFAULT, Importance.HIGH, SSL_KEYSTORE_LOCATION_DOC + ).define( + SSL_KEYSTORE_WATCH_LOCATION_CONFIG, + Type.STRING, + SSL_KEYSTORE_WATCH_LOCATION_DEFAULT, + Importance.LOW, + SSL_KEYSTORE_WATCH_LOCATION_DOC ).define( SSL_KEYSTORE_PASSWORD_CONFIG, Type.PASSWORD, From f33ec0cb18da02a4bdcb7dc2e7cf137892a7dad1 Mon Sep 17 00:00:00 2001 From: Yang Lei Date: Thu, 14 Nov 2019 22:41:56 -0500 Subject: [PATCH 15/17] revise thread and exception handling --- .../java/io/confluent/rest/FileWatcher.java | 72 +++++++++++-------- 1 file changed, 44 insertions(+), 28 deletions(-) diff --git a/core/src/main/java/io/confluent/rest/FileWatcher.java b/core/src/main/java/io/confluent/rest/FileWatcher.java index 5be0116953..9cdcf222cf 100644 --- a/core/src/main/java/io/confluent/rest/FileWatcher.java +++ b/core/src/main/java/io/confluent/rest/FileWatcher.java @@ -30,11 +30,19 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; // reference https://gist.github.com/danielflower/f54c2fe42d32356301c68860a4ab21ed public class FileWatcher implements Runnable { private static final Logger log = LoggerFactory.getLogger(FileWatcher.class); + private static final ExecutorService executor = Executors.newFixedThreadPool(1, + new ThreadFactory() { + public Thread newThread(Runnable r) { + Thread t = Executors.defaultThreadFactory().newThread(r); + t.setDaemon(true); + return t; + } + }); public interface Callback { void run() throws Exception; @@ -62,46 +70,54 @@ public FileWatcher(Path file, Callback callback) throws IOException { public static void onFileChange(Path file, Callback callback) throws IOException { log.info("Configure watch file change: " + file); FileWatcher fileWatcher = new FileWatcher(file, callback); - ExecutorService executor = Executors.newSingleThreadExecutor(); - Future future = executor.submit(fileWatcher); - Runtime.getRuntime().addShutdownHook(new Thread(executor::shutdownNow)); + executor.submit(fileWatcher); } public void run() { try { while (!shutdown) { log.debug("Watching file change: " + file); - // wait for key to be signalled - WatchKey key = watchService.take(); - log.info("Watch Key notified"); - for (WatchEvent event : key.pollEvents()) { - WatchEvent.Kind kind = event.kind(); - if (kind == StandardWatchEventKinds.OVERFLOW) { - log.debug("Watch event is OVERFLOW"); - continue; - } - WatchEvent ev = (WatchEvent)event; - Path changed = this.file.getParent().resolve(ev.context()); - log.info("Watch file change: " + ev.context() + "=>" + changed); - // Need to use path equals than isSameFile - if (Files.exists(changed) && changed.equals(this.file)) { - log.debug("Watch matching file: " + file); - try { - callback.run(); - } catch (Exception e) { - log.warn("Hit error callback on file change", e); - } - break; - } + try { + handleNextWatchNotification(); + } catch (InterruptedException e) { + throw e; + } catch (Exception e) { + log.info("Watch service caught exeption, will continue:" + e); } - // reset key - key.reset(); } } catch (InterruptedException e) { log.info("Ending watch due to interrupt"); } } + private void handleNextWatchNotification() throws InterruptedException { + log.debug("Watching file change: " + file); + // wait for key to be signalled + WatchKey key = watchService.take(); + log.info("Watch Key notified"); + for (WatchEvent event : key.pollEvents()) { + WatchEvent.Kind kind = event.kind(); + if (kind == StandardWatchEventKinds.OVERFLOW) { + log.debug("Watch event is OVERFLOW"); + continue; + } + WatchEvent ev = (WatchEvent)event; + Path changed = this.file.getParent().resolve(ev.context()); + log.info("Watch file change: " + ev.context() + "=>" + changed); + // Need to use path equals than isSameFile + if (Files.exists(changed) && changed.equals(this.file)) { + log.debug("Watch matching file: " + file); + try { + callback.run(); + } catch (Exception e) { + log.warn("Hit error callback on file change", e); + } + break; + } + } + key.reset(); + } + public void shutdown() { shutdown = true; try { From 47ca26b6c30b91026a0d59d1eac0c24ef0f0f417 Mon Sep 17 00:00:00 2001 From: Yang Lei Date: Thu, 14 Nov 2019 22:49:00 -0500 Subject: [PATCH 16/17] revise log --- core/src/main/java/io/confluent/rest/FileWatcher.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/java/io/confluent/rest/FileWatcher.java b/core/src/main/java/io/confluent/rest/FileWatcher.java index 9cdcf222cf..17f573504a 100644 --- a/core/src/main/java/io/confluent/rest/FileWatcher.java +++ b/core/src/main/java/io/confluent/rest/FileWatcher.java @@ -76,7 +76,6 @@ public static void onFileChange(Path file, Callback callback) throws IOException public void run() { try { while (!shutdown) { - log.debug("Watching file change: " + file); try { handleNextWatchNotification(); } catch (InterruptedException e) { From 76f4f85ca342affeb1b848d1fc110c3ddb2bc4d7 Mon Sep 17 00:00:00 2001 From: Yang Lei Date: Thu, 14 Nov 2019 23:04:20 -0500 Subject: [PATCH 17/17] revise log --- core/src/main/java/io/confluent/rest/FileWatcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/io/confluent/rest/FileWatcher.java b/core/src/main/java/io/confluent/rest/FileWatcher.java index 17f573504a..c129847a95 100644 --- a/core/src/main/java/io/confluent/rest/FileWatcher.java +++ b/core/src/main/java/io/confluent/rest/FileWatcher.java @@ -81,7 +81,7 @@ public void run() { } catch (InterruptedException e) { throw e; } catch (Exception e) { - log.info("Watch service caught exeption, will continue:" + e); + log.info("Watch service caught exception, will continue:" + e); } } } catch (InterruptedException e) {