From 6fc19989b7931c29bc429a38776b976775f1c001 Mon Sep 17 00:00:00 2001 From: Jakub Kowalski Date: Thu, 5 Dec 2024 13:16:29 +0100 Subject: [PATCH] Offline license (#7804) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: MichaƂ Bartoszkiewicz GitOrigin-RevId: 24be4beadca59a176d0749fba5b0aa1048df4a16 --- Cargo.lock | 7 + Cargo.toml | 2 + integration_tests/license/__init__.py | 0 integration_tests/license/test_license.py | 77 ++++++ python/pathway/tests/test_telemetry.py | 2 +- src/engine/license.rs | 294 +++++++++++++++++----- src/engine/telemetry.rs | 21 +- src/python_api.rs | 6 +- 8 files changed, 329 insertions(+), 80 deletions(-) create mode 100644 integration_tests/license/__init__.py create mode 100644 integration_tests/license/test_license.py diff --git a/Cargo.lock b/Cargo.lock index ef6449f6..17af63c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1399,6 +1399,7 @@ dependencies = [ "fiat-crypto", "rustc_version", "subtle", + "zeroize", ] [[package]] @@ -2144,6 +2145,8 @@ version = "2.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53" dependencies = [ + "pkcs8", + "serde", "signature", ] @@ -2155,9 +2158,11 @@ checksum = "4a3daa8e81a3963a60642bcc1f90a670680bd4a77535faa384e9d1c79d620871" dependencies = [ "curve25519-dalek", "ed25519", + "serde", "sha2", "signature", "subtle", + "zeroize", ] [[package]] @@ -4015,10 +4020,12 @@ dependencies = [ "deltalake", "derivative", "differential-dataflow", + "ed25519-dalek", "elasticsearch", "eyre", "futures", "glob", + "hex", "hyper 0.14.30", "id-arena", "itertools 0.13.0", diff --git a/Cargo.toml b/Cargo.toml index 450ced9d..2a32657b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,9 +35,11 @@ csv = "1.3.0" deltalake = { version = "0.19.0", features = ["datafusion", "s3-native-tls"] } derivative = "2.2.0" differential-dataflow = { path = "./external/differential-dataflow" } +ed25519-dalek = { version = "2.1.1", features = ["serde", "pkcs8"] } elasticsearch = "8.15.0-alpha.1" futures = "0.3.30" glob = "0.3.1" +hex = "0.4.3" hyper = { version = "0.14", features = ["server"] } id-arena = "2.2.1" itertools = "0.13.0" diff --git a/integration_tests/license/__init__.py b/integration_tests/license/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/integration_tests/license/test_license.py b/integration_tests/license/test_license.py new file mode 100644 index 00000000..43be6d68 --- /dev/null +++ b/integration_tests/license/test_license.py @@ -0,0 +1,77 @@ +# import pathway.internals as pw +import json +import logging +import os +import re + +import pytest + +import pathway as pw +from pathway.internals.config import _check_entitlements +from pathway.tests.utils import run_all + +PATHWAY_LICENSES = json.loads(os.environ.get("PATHWAY_LICENSES", "{}")) + + +def test_license_malformed(): + pw.set_license_key(PATHWAY_LICENSES["malformed"]) + with pytest.raises( + RuntimeError, + match=r"unable to validate license", + ): + pw.run_all() + + +def test_license_wrong_signature(): + pw.set_license_key(PATHWAY_LICENSES["wrong-signature"]) + with pytest.raises( + RuntimeError, + match=r"unable to validate license: license file is invalid", + ): + pw.run_all() + + +@pytest.mark.parametrize("license", ["default", "default-key"]) +def test_license_default_key(license, caplog): + caplog.set_level(level=logging.INFO) + pw.set_license_key(PATHWAY_LICENSES[license]) + run_all() + assert "Telemetry enabled" in caplog.text + assert "Monitoring server:" in caplog.text + + +@pytest.mark.parametrize("license", ["default", "default-key"]) +def test_license_default_key_insufficient_entitlements(license, caplog): + pw.set_license_key(PATHWAY_LICENSES[license]) + with pytest.raises( + RuntimeError, + match=re.escape( + 'one of the features you used ["XPACK-SPATIAL"] requires upgrading your Pathway license.' + ), + ): + _check_entitlements("xpack-spatial") + + +def test_license_enterprise(caplog): + caplog.set_level(level=logging.INFO) + pw.set_license_key(PATHWAY_LICENSES["enterprise-no-expire"]) + + _check_entitlements("xpack-spatial") + run_all() + + assert "Telemetry enabled" not in caplog.text + assert "Monitoring server:" in caplog.text + + +def test_license_enterprise_expired(caplog): + caplog.set_level(level=logging.INFO) + pw.set_license_key(PATHWAY_LICENSES["enterprise-expired"]) + + _check_entitlements("xpack-spatial") + run_all() + + assert ( + "License has expired. Please renew to continue using the service" in caplog.text + ) + assert "Telemetry enabled" not in caplog.text + assert "Monitoring server:" in caplog.text diff --git a/python/pathway/tests/test_telemetry.py b/python/pathway/tests/test_telemetry.py index 2212c7c4..054c75ad 100644 --- a/python/pathway/tests/test_telemetry.py +++ b/python/pathway/tests/test_telemetry.py @@ -35,7 +35,7 @@ def test_monitoring_insufficient_license(): with pytest.raises( api.EngineError, match=re.escape( - 'one of the features you used ["monitoring"] requires upgrading your Pathway license' + 'one of the features you used ["MONITORING"] requires upgrading your Pathway license' ), ): run_all() diff --git a/src/engine/license.rs b/src/engine/license.rs index eac96b59..93713ff4 100644 --- a/src/engine/license.rs +++ b/src/engine/license.rs @@ -1,59 +1,89 @@ +use std::borrow::Borrow; use std::collections::HashMap; +use std::fs; use std::time::Duration; +use crate::engine::error::DynResult; +use base64::{prelude::BASE64_STANDARD, Engine}; use cached::proc_macro::cached; -use log::info; +use chrono::{DateTime, Utc}; +use ed25519_dalek::VerifyingKey; +use ed25519_dalek::{PUBLIC_KEY_LENGTH, SIGNATURE_LENGTH}; +use hex::FromHex; +use log::{debug, warn}; use nix::sys::resource::Resource; use regex::Regex; -use serde::Deserialize; -use serde_json::{json, Value}; +use serde::{Deserialize, Serialize}; +use serde_json; +use serde_json::json; + +use super::error::DynError; const PATHWAY_LICENSE_SERVER: &str = "https://license.pathway.com"; +const PUBLIC_KEY: &str = "c6ef2abddc7da08f7c107649613a8f7dd853df3c54f6cafa656fc8b66fb3e8f3"; +const LICENSE_ALGORITHM: &str = "base64+ed25519"; + #[derive(Clone, Copy)] pub struct ResourceLimit(pub Resource, pub u64); #[derive(Clone, Hash, PartialEq, Eq, Debug)] pub enum License { LicenseKey(String), + OfflineLicense(LicenseDetails), NoLicenseKey, } impl License { - pub fn new(license_key: Option) -> Self { - if let Some(license_key) = license_key { - match license_key.trim() { - "" => License::NoLicenseKey, - _ => License::LicenseKey(license_key), + pub fn new(license_key: Option) -> Result { + match read_license_to_string(license_key)? { + key if key.is_empty() => Ok(Self::NoLicenseKey), + key if key.starts_with("-----BEGIN LICENSE FILE-----") => { + let license = read_license_content(key)?; + Ok(Self::OfflineLicense(license)) } - } else { - License::NoLicenseKey + key => Ok(Self::LicenseKey(key)), } } pub fn check_entitlements( &self, - entitlements: Vec, - ) -> Result { - check_entitlements(self.clone(), entitlements) + entitlements: impl IntoIterator>, + ) -> Result<(), Error> { + let entitlements: Vec = entitlements + .into_iter() + .map(|s| s.borrow().to_uppercase()) + .collect(); + match self { + License::NoLicenseKey => Err(Error::InsufficientLicense(entitlements)), + License::OfflineLicense(license) => { + let valid = entitlements + .iter() + .all(|e| license.entitlements.contains(e)); + if valid { + Ok(()) + } else { + Err(Error::InsufficientLicense(entitlements)) + } + } + License::LicenseKey(key) => { + check_license_key_entitlements(key.clone(), entitlements)?; + Ok(()) + } + } } pub fn telemetry_required(&self) -> bool { match self { License::NoLicenseKey => false, - License::LicenseKey(_) => { - if let Ok(result) = self.check_entitlements(vec![]) { - result.telemetry_required() - } else { - false - } - } + License::OfflineLicense(license) => license.telemetry_required, + License::LicenseKey(key) => check_license_key_entitlements(key.clone(), vec![]) + .map_or(false, |result| result.telemetry_required()), } } pub fn shortcut(&self) -> String { match self { - License::NoLicenseKey => String::new(), License::LicenseKey(key) => { let pattern = r"^([^-\s]+-){4}[^-\s]+(-[^-\s]+)*$"; let re = Regex::new(pattern).unwrap(); @@ -65,6 +95,7 @@ impl License { String::new() } } + _ => String::new(), } } } @@ -73,14 +104,20 @@ impl License { pub enum Error { #[error("one of the features you used {0:?} requires upgrading your Pathway license.\nFor more information and to obtain your license key, visit: https://pathway.com/get-license/")] InsufficientLicense(Vec), - #[error("unable to validate license")] - LicenseValidationError, + #[error("unable to validate license: {0}")] + LicenseValidationError(String), +} + +impl From<&str> for Error { + fn from(msg: &str) -> Self { + Self::LicenseValidationError(msg.to_string()) + } } #[derive(Deserialize, Clone)] -pub struct ValidationResponse { +struct ValidationResponse { valid: bool, - details: HashMap, + details: HashMap, } impl ValidationResponse { @@ -88,7 +125,7 @@ impl ValidationResponse { let key = "telemetry_required".to_string(); if self.details.contains_key(&key) { let telemetry = self.details.get(&key).unwrap(); - if let Value::Bool(telemetry_required) = telemetry { + if let serde_json::Value::Bool(telemetry_required) = telemetry { return *telemetry_required; } } @@ -97,20 +134,12 @@ impl ValidationResponse { } #[cached] -pub fn check_entitlements( - license: License, +fn check_license_key_entitlements( + license_key: String, entitlements: Vec, ) -> Result { KeygenLicenseChecker::new(PATHWAY_LICENSE_SERVER.to_string()) - .check_entitlements(license, entitlements) -} - -trait LicenseChecker { - fn check_entitlements( - &self, - license: License, - entitlements: Vec, - ) -> Result; + .check_entitlements(&license_key, entitlements) } struct KeygenLicenseChecker { @@ -125,43 +154,172 @@ impl KeygenLicenseChecker { .expect("initializing license checker should not fail"); Self { api_url, client } } -} -impl LicenseChecker for KeygenLicenseChecker { fn check_entitlements( &self, - license: License, + license_key: &str, entitlements: Vec, ) -> Result { - info!("Checking entitlements: {:?}", entitlements); - match license { - License::LicenseKey(license_key) => { - let response = self - .client - .post(format!("{}/license/validate", self.api_url)) - .header("Api-Version", "v1") - .timeout(Duration::from_secs(10)) - .json(&json!({ - "license_key": license_key, - "entitlements": entitlements - })) - .send() - .map_err(|_| Error::LicenseValidationError)?; - - if response.status().is_success() { - let result = response - .json::() - .map_err(|_| Error::LicenseValidationError)?; - if result.valid { - Ok(result) - } else { - Err(Error::InsufficientLicense(entitlements)) - } - } else { - Err(Error::LicenseValidationError) - } + let response = self + .client + .post(format!("{}/license/validate", self.api_url)) + .header("Api-Version", "v1") + .timeout(Duration::from_secs(10)) + .json(&json!({ + "license_key": license_key, + "entitlements": entitlements + })) + .send() + .map_err(|e| Error::LicenseValidationError(e.to_string()))?; + + if response.status().is_success() { + let result = response + .json::() + .map_err(|_| Error::LicenseValidationError("malformed response".to_string()))?; + if result.valid { + Ok(result) + } else { + Err(Error::InsufficientLicense(entitlements)) } - License::NoLicenseKey => Err(Error::InsufficientLicense(entitlements)), + } else { + let status = response.status(); + Err(Error::LicenseValidationError( + status.canonical_reason().unwrap_or("Unknown error").into(), + )) + } + } +} + +#[derive(Deserialize, Debug)] +struct LicenseFile<'a> { + enc: &'a str, + sig: &'a str, + alg: &'a str, +} + +#[derive(Serialize, Debug, Clone, Hash, PartialEq, Eq)] +#[allow(clippy::module_name_repetitions)] +pub struct LicenseDetails { + telemetry_required: bool, + entitlements: Vec, + expiration_date: Option>, +} + +fn deserialize_signed_license(public_key: &str, license: &str) -> DynResult { + let public_key = VerifyingKey::from_bytes(&<[u8; PUBLIC_KEY_LENGTH]>::from_hex(public_key)?)?; + + // load license + let cleaned_license = license + .replace("-----BEGIN LICENSE FILE-----", "") + .replace("-----END LICENSE FILE-----", "") + .replace('\n', ""); + + let license_data = String::from_utf8(BASE64_STANDARD.decode(cleaned_license)?)?; + let license_file: LicenseFile = serde_json::from_str(&license_data)?; + + // assert license crypto algorithm + if license_file.alg != LICENSE_ALGORITHM { + return Err(DynError::from(format!( + "algorithm {:?} not supported", + license_file.alg, + ))); + } + + // verify signature + let message = format!("license/{}", license_file.enc); + let signature: [u8; SIGNATURE_LENGTH] = BASE64_STANDARD + .decode(license_file.sig)? + .try_into() + .map_err(|_| "signature format is invalid")?; + public_key + .verify_strict(message.as_bytes(), &signature.into()) + .map_err(|_| "license file is invalid")?; + + let payload = String::from_utf8(BASE64_STANDARD.decode(license_file.enc)?)?; + let license_content: serde_json::Value = serde_json::from_str(&payload)?; + + Ok(license_content) +} + +#[cached] +fn read_license_content(license: String) -> Result { + let lic = deserialize_signed_license(PUBLIC_KEY, &license.clone()) + .map_err(|e| Error::LicenseValidationError(e.to_string()))?; + + let included = lic["included"] + .as_array() + .ok_or(Error::LicenseValidationError( + "malformed license".to_string(), + ))?; + + let mut entitlements: Vec = vec![]; + let mut telemetry_required: bool = false; + + for item in included { + match item["type"].as_str() { + Some("entitlements") => { + let item_code = + item["attributes"]["code"] + .as_str() + .ok_or(Error::LicenseValidationError( + "malformed license".to_string(), + ))?; + entitlements.push(item_code.to_string()); + } + Some("policies") => { + telemetry_required = item["attributes"]["metadata"]["telemetryRequired"] + .as_bool() + .unwrap_or(false); + } + _ => (), + } + } + + let parse_datetime = |value: &serde_json::Value| -> Result>, Error> { + match value.as_str() { + Some(d) => d + .parse::>() + .map(Some) + .map_err(|_| Error::LicenseValidationError("malformed license".to_string())), + None => Ok(None), + } + }; + + let expiration_date = parse_datetime(&lic["data"]["attributes"]["expiry"])?; + let license_file_ttl = parse_datetime(&lic["meta"]["expiry"])?; + + let is_expired = |date: Option>| date.is_some_and(|d| d < Utc::now()); + + if is_expired(expiration_date) { + warn!("License has expired. Please renew to continue using the service."); + } else if is_expired(license_file_ttl) { + warn!("License file's time-to-live has been exceeded. Please update the license file."); + } + + let result = LicenseDetails { + telemetry_required, + entitlements, + expiration_date, + }; + + debug!( + "License > {}", + serde_json::to_string_pretty(&result).unwrap() + ); + + Ok(result) +} + +#[cached] +pub fn read_license_to_string(license_key_or_path: Option) -> Result { + if let Some(key_or_path) = license_key_or_path { + let key_or_path = key_or_path.trim(); + if let Some(file_path) = key_or_path.strip_prefix("file://") { + fs::read_to_string(file_path).map_err(|e| Error::LicenseValidationError(e.to_string())) + } else { + Ok(key_or_path.to_string()) } + } else { + Ok(String::new()) } } diff --git a/src/engine/telemetry.rs b/src/engine/telemetry.rs index c9d7181f..0166de79 100644 --- a/src/engine/telemetry.rs +++ b/src/engine/telemetry.rs @@ -177,6 +177,8 @@ fn deduplicate(input: Vec>) -> Vec { #[derive(Clone, Debug)] #[allow(clippy::module_name_repetitions)] pub struct TelemetryEnabled { + pub telemetry_server: Option, + pub monitoring_server: Option, pub logging_servers: Vec, pub tracing_servers: Vec, pub metrics_servers: Vec, @@ -206,7 +208,7 @@ impl Config { if monitoring_server.is_some() { license - .check_entitlements(vec!["monitoring".to_string()]) + .check_entitlements(["monitoring"]) .map_err(DynError::from)?; } @@ -221,14 +223,14 @@ impl Config { } match license { - License::LicenseKey(_) => Config::create_enabled( + License::NoLicenseKey => Ok(Config::Disabled), + _ => Config::create_enabled( run_id, telemetry_server, monitoring_server, trace_parent, license, ), - License::NoLicenseKey => Ok(Config::Disabled), } } @@ -252,6 +254,8 @@ impl Config { } }); Ok(Config::Enabled(Box::new(TelemetryEnabled { + telemetry_server: telemetry_server.clone(), + monitoring_server: monitoring_server.clone(), logging_servers: deduplicate(vec![monitoring_server.clone()]), tracing_servers: deduplicate(vec![telemetry_server.clone(), monitoring_server.clone()]), metrics_servers: deduplicate(vec![telemetry_server, monitoring_server]), @@ -403,13 +407,14 @@ impl Drop for Runner { pub fn maybe_run_telemetry_thread(graph: &dyn Graph, config: Config) -> Option { match config { Config::Enabled(config) => { - if config.tracing_servers.is_empty() { - debug!("Telemetry disabled"); - } else { + if config.telemetry_server.is_some() { info!("Telemetry enabled"); } - debug!("OTEL config: {config:?}"); - let telemetry = Telemetry::new(config); + if let Some(monitoring_server) = config.monitoring_server.clone() { + info!("Monitoring server: {monitoring_server}"); + } + + let telemetry = Telemetry::new(config.clone()); let stats_shared = Arc::new(ArcSwapOption::from(None)); let runner = Runner::run(telemetry, stats_shared.clone()); diff --git a/src/python_api.rs b/src/python_api.rs index b86833d1..afcad87b 100644 --- a/src/python_api.rs +++ b/src/python_api.rs @@ -3289,7 +3289,7 @@ pub fn run_with_new_graph( None } }; - let license = License::new(license_key); + let license = License::new(license_key)?; let telemetry_config = EngineTelemetryConfig::create(&license, run_id, monitoring_server, trace_parent)?; let results: Vec> = run_with_wakeup_receiver(py, |wakeup_receiver| { @@ -3817,7 +3817,7 @@ impl TelemetryConfig { license_key: Option, monitoring_server: Option, ) -> PyResult { - let license = License::new(license_key); + let license = License::new(license_key)?; let config = EngineTelemetryConfig::create(&license, run_id, monitoring_server, None)?; Ok(config.into()) } @@ -5339,7 +5339,7 @@ impl From for PyErr { entitlements, ))] fn check_entitlements(license_key: Option, entitlements: Vec) -> PyResult<()> { - License::new(license_key).check_entitlements(entitlements)?; + License::new(license_key)?.check_entitlements(entitlements)?; Ok(()) }