Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Use off-heap storage for forecasting large models #22

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bin/autodetect/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ INSTALL_DIR=$(CPP_PLATFORM_HOME)/bin
ML_LIBS=$(LIB_ML_CORE) $(LIB_ML_MATHS) $(LIB_ML_MODEL) $(LIB_ML_API)

USE_BOOST=1
USE_BOOST_FILESYSTEM_LIBS=1
USE_BOOST_PROGRAMOPTIONS_LIBS=1
USE_RAPIDJSON=1
USE_EIGEN=1
Expand Down
26 changes: 24 additions & 2 deletions include/api/CForecastRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <model/CForecastDataSink.h>
#include <model/CResourceMonitor.h>

#include <boost/filesystem.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/unordered_set.hpp>

Expand Down Expand Up @@ -80,10 +81,22 @@ class API_EXPORT CForecastRunner final: private core::CNonCopyable
static const size_t DEFAULT_EXPIRY_TIME = 14 * core::constants::DAY;

//! max memory allowed to use for forecast models
static const size_t MAX_FORECAST_MODEL_MEMORY = 20971520; // 20MB
static const size_t MAX_FORECAST_MODEL_MEMORY = 20971520ull; // 20MB

// Note: This value measures the size in memory, not the size of the persistence,
// which is likely higher and would be hard to calculate upfront
//! max memory allowed to use for forecast models persisting to disk
static const size_t MAX_FORECAST_MODEL_PERSISTANCE_MEMORY = 524288000ull; // 500MB

// Note: This value is lower than on X-pack side to prevent side-effects,
// if you change this value also change the limit on X-pack side.
// The purpose of this value is to guard the rest of the system regarding
// an out of disk space
//! minimum disk space required for disk persistence
static const size_t MIN_FORECAST_AVAILABLE_DISK_SPACE = 4294967296ull; // 4GB

//! minimum time between stat updates to prevent to many updates in a short time
static const uint64_t MINIMUM_TIME_ELAPSED_FOR_STATS_UPDATE = 3000; // 3s
static const uint64_t MINIMUM_TIME_ELAPSED_FOR_STATS_UPDATE = 3000ul; // 3s

private:
static const std::string ERROR_FORECAST_REQUEST_FAILED_TO_PARSE;
Expand All @@ -94,6 +107,8 @@ class API_EXPORT CForecastRunner final: private core::CNonCopyable
static const std::string ERROR_NO_CREATE_TIME;
static const std::string ERROR_BAD_MEMORY_STATUS;
static const std::string ERROR_MEMORY_LIMIT;
static const std::string ERROR_MEMORY_LIMIT_DISK;
static const std::string ERROR_MEMORY_LIMIT_DISKSPACE;
static const std::string ERROR_NOT_SUPPORTED_FOR_POPULATION_MODELS;
static const std::string ERROR_NO_SUPPORTED_FUNCTIONS;
static const std::string WARNING_DURATION_LIMIT;
Expand All @@ -112,6 +127,7 @@ class API_EXPORT CForecastRunner final: private core::CNonCopyable
using TForecastModelWrapper = model::CForecastDataSink::SForecastModelWrapper;
using TForecastResultSeries = model::CForecastDataSink::SForecastResultSeries;
using TForecastResultSeriesVec = std::vector<TForecastResultSeries>;
using TMathsModelPtr = boost::shared_ptr<maths::CModel>;

using TStrUSet = boost::unordered_set<std::string>;

Expand Down Expand Up @@ -201,6 +217,9 @@ class API_EXPORT CForecastRunner final: private core::CNonCopyable

//! A collection storing important messages from forecasting
TStrUSet s_Messages;

//! A directory to persist models on disk
std::string s_TemporaryFolder;
};

private:
Expand All @@ -213,6 +232,9 @@ class API_EXPORT CForecastRunner final: private core::CNonCopyable
//! Check for new jobs, blocks while waiting
bool tryGetJob(SForecast &forecastJob);

//! check for sufficient disk space
bool sufficientAvailableDiskSpace(const boost::filesystem::path &path);

//! pushes new jobs into the internal 'queue' (thread boundary)
bool push(SForecast &forecastJob);

Expand Down
20 changes: 20 additions & 0 deletions include/core/RestoreMacros.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,26 @@ namespace core
continue; \
}

#define RESTORE_ENUM(tag, target, enumtype) \
if (name == tag) \
{ \
int value; \
if (core::CStringUtils::stringToType(traverser.value(), value) == false) \
{ \
LOG_ERROR("Failed to restore " #tag ", got " << traverser.value()); \
return false; \
} \
target = enumtype(value); \
continue; \
}

#define RESTORE_ENUM_CHECKED(tag, target, enumtype, restoreSuccess) \
if (name == tag) \
{ \
restoreSuccess = true; \
RESTORE_ENUM(tag, target, enumtype) \
}

#define RESTORE_SETUP_TEARDOWN(tag, setup, restore, teardown) \
if (name == tag) \
{ \
Expand Down
3 changes: 2 additions & 1 deletion include/model/CAnomalyDetector.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ class MODEL_EXPORT CAnomalyDetector : private core::CNonCopyable
CForecastDataSink::SForecastModelPrerequisites getForecastPrerequisites() const;

//! Generate maths models for forecasting
CForecastDataSink::SForecastResultSeries getForecastModels() const;
CForecastDataSink::SForecastResultSeries getForecastModels(bool persistOnDisk = false,
const std::string &persistenceFolder = EMPTY_STRING) const;

//! Remove dead models, i.e. those models that have more-or-less
//! reverted back to their non-informative state. BE CAREFUL WHEN
Expand Down
3 changes: 3 additions & 0 deletions include/model/CCountingModelFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ class MODEL_EXPORT CCountingModelFactory : public CModelFactory
virtual void bucketResultsDelay(std::size_t bucketResultsDelay) ;
//@}

//! Get the minimum seasonal variance scale
virtual double minimumSeasonalVarianceScale() const;

private:
//! Get the field values which partition the data for modeling.
virtual TStrCRefVec partitioningFields(void) const;
Expand Down
3 changes: 3 additions & 0 deletions include/model/CEventRateModelFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ class MODEL_EXPORT CEventRateModelFactory : public CModelFactory
virtual void bucketResultsDelay(std::size_t bucketResultsDelay) ;
//@}

//! Get the minimum seasonal variance scale
virtual double minimumSeasonalVarianceScale() const;

private:
//! Get the field values which partition the data for modeling.
virtual TStrCRefVec partitioningFields(void) const;
Expand Down
3 changes: 3 additions & 0 deletions include/model/CEventRatePopulationModelFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ class MODEL_EXPORT CEventRatePopulationModelFactory : public CModelFactory
virtual void bucketResultsDelay(std::size_t bucketResultsDelay) ;
//@}

//! Get the minimum seasonal variance scale
virtual double minimumSeasonalVarianceScale() const;

private:
//! Get the field values which partition the data for modeling.
virtual TStrCRefVec partitioningFields(void) const;
Expand Down
8 changes: 6 additions & 2 deletions include/model/CForecastDataSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include <model/ImportExport.h>
#include <model/ModelTypes.h>
#include <model/CModelParams.h>

#include <rapidjson/allocators.h>
#include <rapidjson/fwd.h>
Expand All @@ -50,7 +51,7 @@ namespace model
class MODEL_EXPORT CForecastDataSink final : private core::CNonCopyable
{
public:
using TMathsModelPtr = std::unique_ptr<maths::CModel>;
using TMathsModelPtr = boost::shared_ptr<maths::CModel>;
using TStrUMap = boost::unordered_set<std::string>;

//! Wrapper for 1 timeseries model, its feature and by Field
Expand All @@ -73,18 +74,21 @@ class MODEL_EXPORT CForecastDataSink final : private core::CNonCopyable
//! Everything that defines 1 series of forecasts
struct MODEL_EXPORT SForecastResultSeries
{
SForecastResultSeries();
SForecastResultSeries(const SModelParams &modelParams);

SForecastResultSeries(SForecastResultSeries &&other);

SForecastResultSeries(const SForecastResultSeries &that) = delete;
SForecastResultSeries & operator=(const SForecastResultSeries &) = delete;

SModelParams s_ModelParams;
int s_DetectorIndex;
std::vector<SForecastModelWrapper> s_ToForecast;
std::string s_ToForecastPersisted;
std::string s_PartitionFieldName;
std::string s_PartitionFieldValue;
std::string s_ByFieldName;
double s_MinimumSeasonalVarianceScale;
};

//! \brief Data describing prerequisites prior predictions
Expand Down
123 changes: 123 additions & 0 deletions include/model/CForecastModelPersist.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* ELASTICSEARCH CONFIDENTIAL
*
* Copyright (c) 2018 Elasticsearch BV. All Rights Reserved.
*
* Notice: this software, and all information contained
* therein, is the exclusive property of Elasticsearch BV
* and its licensors, if any, and is protected under applicable
* domestic and foreign law, and international treaties.
*
* Reproduction, republication or distribution without the
* express written consent of Elasticsearch BV is
* strictly prohibited.
*/

#ifndef INCLUDED_ml_model_CForecastModelPersist_h
#define INCLUDED_ml_model_CForecastModelPersist_h

#include <core/CJsonStatePersistInserter.h>
#include <core/CJsonStateRestoreTraverser.h>

#include <maths/CModel.h>

#include <model/CModelParams.h>
#include <model/ImportExport.h>
#include <model/ModelTypes.h>

#include <boost/filesystem.hpp>

#include <fstream>
#include <memory>

namespace ml
{
namespace model
{

//! \brief Persist/Restore CModel sub-classes to/from text representations for
//! the purpose of forecasting.
//!
//! DESCRIPTION:\n
//! Persists/Restores models to disk for the purpose of restoring and forecasting
//! on them.
//!
//! IMPLEMENTATION DECISIONS:\n
//! Only as complete as required for forecasting.
//!
//! Persist and Restore are only done to avoid heap memory usage using temporary disk space.
//! No need for backwards compatibility and version'ing as code will only be used
//! locally never leaving process/io boundaries.
class MODEL_EXPORT CForecastModelPersist final
{
public:
using TMathsModelPtr = boost::shared_ptr<maths::CModel>;

public:
class CPersist final {
public:
explicit CPersist(const std::string &temporaryPath);

//! add a model to the persistence
void addModel(const maths::CModel *model,
const model_t::EFeature feature,
const std::string &byFieldValue);

//! close the outputStream
const std::string &finalizePersistAndGetFile();

private:
static void persistOneModel(core::CStatePersistInserter &inserter,
const maths::CModel *model,
const model_t::EFeature feature,
const std::string &byFieldValue);

private:
//! the filename where to persist to
boost::filesystem::path m_FileName;

//! the actual file where it models are persisted to
std::ofstream m_OutStream;

//! the persist inserter
std::unique_ptr<core::CJsonStatePersistInserter> m_PersistInserter;
};

class CRestore final {
public:
explicit CRestore(const SModelParams &modelParams,
double minimumSeasonalVarianceScale,
const std::string &fileName);

//! add a model to the persistence
bool nextModel(TMathsModelPtr &model,
model_t::EFeature &feature,
std::string &byFieldValue);

private:
static bool restoreOneModel(core::CStateRestoreTraverser &traverser,
SModelParams modelParams,
double inimumSeasonalVarianceScale,
TMathsModelPtr &model,
model_t::EFeature &feature,
std::string &byFieldValue);

private:
//! model parameters required in order to restore the model
SModelParams m_ModelParams;

//! minimum seasonal variance scale specific to the model
double m_MinimumSeasonalVarianceScale;

//! the actual file where it models are persisted to
std::ifstream m_InStream;

//! the persist inserter
core::CJsonStateRestoreTraverser m_RestoreTraverser;
}; // class CRestore
}; // class CForecastModelPersist

}
}

#endif // INCLUDED_ml_model_CForecastModelPersist_h
3 changes: 3 additions & 0 deletions include/model/CMetricModelFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ class MODEL_EXPORT CMetricModelFactory : public CModelFactory
virtual void bucketResultsDelay(std::size_t bucketResultsDelay) ;
//@}

//! Get the minimum seasonal variance scale
virtual double minimumSeasonalVarianceScale() const;

private:
//! Get the field values which partition the data for modeling.
virtual TStrCRefVec partitioningFields(void) const;
Expand Down
3 changes: 3 additions & 0 deletions include/model/CMetricPopulationModelFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ class MODEL_EXPORT CMetricPopulationModelFactory : public CModelFactory
virtual void bucketResultsDelay(std::size_t bucketResultsDelay) ;
//@}

//! Get the minimum seasonal variance scale
virtual double minimumSeasonalVarianceScale() const;

private:
//! Get the field values which partition the data for modeling.
virtual TStrCRefVec partitioningFields(void) const;
Expand Down
3 changes: 3 additions & 0 deletions include/model/CModelFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,9 @@ class MODEL_EXPORT CModelFactory
//! component.
std::size_t componentSize(void) const;

// Get the minimum seasonal variance scale, specific to the model
virtual double minimumSeasonalVarianceScale() const = 0;

protected:
using TMultivariatePriorPtrVec = std::vector<TMultivariatePriorPtr>;
using TOptionalSearchKey = boost::optional<CSearchKey>;
Expand Down
Loading