Skip to content

Commit

Permalink
Merge pull request #63 from xingetouzi/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
BurdenBear authored Feb 26, 2019
2 parents 5d1560d + 5c9b0ed commit cf3a2ee
Show file tree
Hide file tree
Showing 23 changed files with 1,695 additions and 77 deletions.
2 changes: 1 addition & 1 deletion vnpy/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# encoding: UTF-8

__version__ = '1.1.17'
__version__ = '1.1.18'
__author__ = 'Xingetouzi'
27 changes: 24 additions & 3 deletions vnpy/api/rest/RestClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from queue import Empty, Queue
from datetime import datetime
from multiprocessing.dummy import Pool
from collections import deque

import requests
from enum import Enum
Expand Down Expand Up @@ -81,7 +82,9 @@ def __init__(self):

self._queue = Queue()
self._pool = None # type: Pool

self._queueing_times = deque(maxlen=100)
self._response_times = deque(maxlen=100)

#----------------------------------------------------------------------
def init(self, urlBase):
"""
Expand Down Expand Up @@ -153,6 +156,9 @@ def addRequest(self,
request.extra = extra
request.onFailed = onFailed
request.onError = onError
request.createDatetime = datetime.now()
request.deliverDatetime = None
request.responseDatetime = None
self._queue.put(request)
return request

Expand Down Expand Up @@ -234,14 +240,20 @@ def _processRequest(self, request, session): # type: (Request, requests.Session
request = self.sign(request)

url = self.makeFullUrl(request.path)


request.deliverDatetime = datetime.now()
self._queueing_times.append((request.deliverDatetime - request.createDatetime).total_seconds())

response = session.request(request.method,
url,
headers=request.headers,
params=request.params,
data=request.data)
request.response = response

request.responseDatetime = datetime.now()

self._response_times.append((request.responseDatetime - request.deliverDatetime).total_seconds())

httpStatusCode = response.status_code
if httpStatusCode / 100 == 2: # 2xx都算成功,尽管交易所都用200
jsonBody = response.json()
Expand Down Expand Up @@ -273,3 +285,12 @@ def makeFullUrl(self, path):
url = self.urlBase + path
return url

def getStatus(self):
"""
获取此时client的一些运行时基本信息
"""
return {
"queueing_number": self._queue.qsize(),
"avg_queueing_time": sum(self._queueing_times) / len(self._queueing_times) if len(self._queueing_times) else 0,
"avg_response_time": sum(self._response_times) / len(self._response_times) if len(self._response_times) else 0
}
2 changes: 1 addition & 1 deletion vnpy/applications/VnObserver/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import click
from vnpy.trader.app.ctaStrategy.plugins.ctaMetric.observer import run_observer
from vnpy.trader.app.ctaStrategy.plugins.ctaMetric.observers import run_observer

app_name="observer"

Expand Down
10 changes: 8 additions & 2 deletions vnpy/trader/app/ctaStrategy/ctaBacktesting.py
Original file line number Diff line number Diff line change
Expand Up @@ -1139,7 +1139,7 @@ def runOptimization(self, strategyClass, optimizationSetting):
targetValue = d[targetName]
except KeyError:
targetValue = 0
resultList.append(([str(setting)], targetValue, d))
resultList.append((setting, targetValue, d))

# 显示结果
resultList.sort(reverse=True, key=lambda result: result[1])
Expand Down Expand Up @@ -1523,6 +1523,11 @@ def addParameter(self, name, start, end=None, step=None):

self.paramDict[name] = l

def addParams(self, name, params):
if isinstance(params, str):
params = eval(params)
self.paramDict[name] = list(params)

# ----------------------------------------------------------------------
def generateSetting(self):
"""生成优化参数组合"""
Expand Down Expand Up @@ -1634,7 +1639,8 @@ def optimize(backtestEngineClass, strategyClass, setting, targetName,
targetValue = d[targetName]
except KeyError:
targetValue = 0
return (str(setting), targetValue, d)
# return (str(setting), targetValue, d)
return (setting, targetValue, d)


def gen_dates(b_date, days):
Expand Down
6 changes: 3 additions & 3 deletions vnpy/trader/app/ctaStrategy/plugins/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ CtaEngine的核心部分是根据策略的信息(主要是SymbolList)对从Gatew
同样也根据symbol的信息路由到对应的Gateway中执行。同时原本的vnpy还提供了本地Stop单的功能。为了满足不同策略的众多需要,我们需要不断丰富CtaEngine和CtaTemplate的功能。
CtaPlugin就是为了在不改动核心部分且独立解耦地为CtaEngine加入某个新功能而实现而引入的设计。

> 在此机制下,CtaEngine有了可拓展性,可能被成为更广泛的StrategyEngine比较合适,其他的一些策略引擎和模板比如套利模板也可以在这种机制下用plugin实现。
> 在此机制下,CtaEngine有了可拓展性,可能被更广泛地称为StrategyEngine比较合适,其他的一些策略引擎和模板比如套利模板也可以在这种机制下用plugin实现。
## 功能描述
### CtaEngine中使用的信息
Expand All @@ -15,9 +15,9 @@ CtaEngine中的各种处理主要依据以下两种信息:
### CtaEngine本身的功能
CtaEngine本身由两个处理流:
- 在Event处理流中加入与策略相关的Tick、Order、Trade、Position等事件的处理,然后视需要转发进策略
- 对策略调用Gateway中的API接口进行了封装,根据调用时的参数分发到某个具体的Gateway去执行。
- 对策略调用Gateway中的API接口(主要是下单和撤单)进行了封装,根据调用时的参数分发到某个具体的Gateway去执行。
### CtaPlugin的功能
CtaPlugin的功能就是根据CtaEngine中主要用到的信息,在CtaEngine的各处理流的前后设置切面,插入自己的处理逻辑来实现自己的功能。
CtaPlugin的功能就是根据CtaEngine中主要用到的信息,在CtaEngine的各处理流及Gateway的API接口调用的前后设置切面,插入自己的处理逻辑来实现自己的功能。
具体接口的定义请参考`vnpy.trader.app.ctaStrategy.plugins`

> 由于BacktestingEngine本身的实现和接口定义都更复杂,目前各Plugin的BacktestingEngine没有统一框架,需单独实现。
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,8 @@ def _update_with_tick(self, tick, freq):

def _update_with_bar(self, bar, freq):
current_bar = self._current_bars.get(freq, None)
dt = bar.datetime
bt = self._bar_timers[freq]
dt = bt.get_current_dt(bar.datetime)
finished_bar = None
if current_bar:
if bt.is_new_bar(current_bar.datetime, dt):
Expand Down
10 changes: 5 additions & 5 deletions vnpy/trader/app/ctaStrategy/plugins/ctaMetric/README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# Cta策略运行监控插件
## 情景说明
不同于国内期货交易,数字货币交易是**7X24小时**的,且提供的交易接口稳定性远不如CTA接口。如果需要在夜晚无人值守时运行策略,需要辅以额外的自动化风控手段,以防止策略运行出现异常甚至运行策略的服务器挂掉的情况带来的不可预计的风险。
不同于国内期货交易,数字货币交易是**7X24小时**的,且提供的交易接口稳定性远不如CTA接口。如果需要在夜晚无人值守时运行策略,需要辅以外部的自动化风控手段,以防止策略运行出现异常甚至运行策略的服务器挂掉的情况带来的不可预计的风险。
## 功能描述
该组件用于无人值守运行策略时,监控策略运行时的各项指标,推送到策略外部的监控组件,由外部组件完成风控报警或后续的一些操作,目前选用的外部监控组件为小米开源的[Open-Falcon](https://github.com/open-falcon/falcon-plus)。相对vnpy自带的风控模块主要针对发送订单做事前风控,该组件主要应用场景为事后风控,通过收集的指标也可以辅助对策略运行逻辑进行检查。

推送监控指标的功能由该组件的两部分完成:
- `ctaMetricPlugin`跟随策略启动运行,会将最新的监控指标写入一个单独的日志文件中。
- `ctaMetricObserver`需要在单独启动,observer会监控某根目录下所有记录有监控指标的日志文件,获取到最新的指标值,并按一定频率通过HTTP接口推送给open-falcon
- `ctaMetricObserver`需要在单独启动,observer会监控指定目录下所有记录有监控指标的日志文件,获取到最新的指标值,并按一定频率通过HTTP接口推送给open-falcon

这样的设计旨在尽量减小策略运行时记录监控性能指标的额外耗时,且能在策略进程意外退出时,保持监控指标的推送,从而可以监测到策略意外停止运行的情况。
## 监控指标
Expand Down Expand Up @@ -37,9 +37,9 @@ Open-Falcon中的监控指标,采用和OpenTSDB相似的数据格式:metric

| metric | tags | 监控内容 | type |
| :-: | :-: | :-: | :-: |
| strategy.heartbeat | `"strategy=%s"%(策略名称)` | 策略心跳,说明策略进程在正常运行并记录监控指标 | COUNTER |
| strategy.trading | `"strategy=%s"%(策略名称)` | 策略是否处于交易状态 | GAUGE |
| gateway.connected | `"strategy=%s,gateway=%s"%(策略名称,gateway名称)` | gateway的连接状态 | GAUGE |
| strategy.heartbeat | `"strategy=%s"%(策略名称)` | 策略心跳,说明策略进程在正常运行并记录监控指标,如果策略正常停止,该项会变为0 | COUNTER |
| strategy.trading | `"strategy=%s"%(策略名称)` | 策略是否处于交易状态,1表示处于交易状态,0表示不处于 | GAUGE |
| gateway.connected | `"strategy=%s,gateway=%s"%(策略名称,gateway名称)` | gateway的连接状态,1表示正常连接,0表示连接方向 | GAUGE |
| position.volume | `"strategy=%s,gateway=%s,symbol=%s,direction=%s"%(策略名称,gateway名称,symbol名称,多空方向)` | 按所属策略、交易合约和多空方向分组后,每组的持仓量 | GAUGE |
| trade.count | `"strategy=%s,gateway=%s,symbol=%s"%(策略名称,gateway名称,symbol名称)` | 按所属策略和交易合约分组后,每组的成交数 | GAUGE |
| trade.volume | `"strategy=%s,gateway=%s,symbol=%s"%(策略名称,gateway名称,symbol名称)` | 按所属策略和交易合约分组后,每组的成交volume | GAUGE |
Expand Down
21 changes: 19 additions & 2 deletions vnpy/trader/app/ctaStrategy/plugins/ctaMetric/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,18 @@ def __init__(self):
def to_json(self):
return json.dumps(self.__dict__, cls=NumpyEncoder)

@classmethod
def from_dict(cls, dct):
obj = cls()
obj.endpoint = dct["endpoint"]
obj.metric = dct["metric"]
obj.timestamp = dct["timestamp"]
obj.step = dct["step"]
obj.value = dct["value"]
obj.counterType = dct["counterType"]
obj.tags = dct["tags"]
return obj


class OpenFalconMetricFactory(object):
PREFIX = "vnpy.cta"
Expand Down Expand Up @@ -254,8 +266,13 @@ def pushMetrics(self):
func()
except: # prevent stop eventengine's thread
self.ctaEngine.error(traceback.format_exc())
self._metricSender.pushMetrics(self._metricCaches)
# self.ctaEngine.writeCtaLog("推送%s个监控指标" % len(self._metricCaches))
st = time.time()
try:
self._metricSender.pushMetrics(self._metricCaches)
except:
self.ctaEngine.error(traceback.format_exc())
et = time.time()
self.ctaEngine.debug("推送%s个监控指标,耗时%s", len(self._metricCaches), et - st)
self.clearCache()

def clearCache(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .log import LogFileMetricObserver
from .sqlite import SqliteMetricObserver
from .utils import run_observer as run_observer_cls

def run_observer(path, url=None, cls=None):
cls = cls or SqliteMetricObserver
run_observer_cls(cls, path, url)
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def on_modified(self, event):
self.merge_data(handler)


class MetricFileObserver(object):
class LogFileMetricObserver(object):
interval = 10
reg = r".*ctaMetric.log.*" # 监控的文件

Expand Down Expand Up @@ -136,10 +136,7 @@ def push_metric(self, df):
logging.info("推送%s个指标,response:%s" % (len(payload), r.content))


def run_observer(path, url=None):
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-8s %(message)s")
MetricFileObserver(path, url).run()


if __name__ == "__main__":
run_observer(".")
from utils import run_observer

run_observer(LogFileMetricObserver, ".")
120 changes: 120 additions & 0 deletions vnpy/trader/app/ctaStrategy/plugins/ctaMetric/observers/sqlite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import json
import os
import time
import re
import logging

import requests
from watchdog.observers import Observer
from watchdog.events import RegexMatchingEventHandler
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from vnpy.trader.app.ctaStrategy.plugins.ctaMetric.base import NumpyEncoder
from vnpy.trader.app.ctaStrategy.plugins.ctaMetric.senders.sqlite import OpenFalconMetric as OpenFalconMetricModel
from vnpy.trader.app.ctaStrategy.plugins.ctaMetric.base import OpenFalconMetric

class HandleMetric(object):
def __init__(self, filepath):
self.filepath = filepath
self._init()

def _init(self):
engine = create_engine('sqlite:///%s' % self.filepath)
self.engine = engine
Session = sessionmaker()
Session.configure(bind=engine)
self.session = Session()

def has_table(self):
return self.engine.dialect.has_table(self.engine, OpenFalconMetricModel.__tablename__)

def get_metrics(self):
if self.has_table():
metrics = self.session.query(OpenFalconMetricModel).all()
metrics = [OpenFalconMetric.from_dict(metric.__dict__) for metric in metrics]
return metrics
else:
return []


class FileEventHandler(RegexMatchingEventHandler):
def __init__(self, root, reg):
RegexMatchingEventHandler.__init__(self, regexes=[reg])
self.handlers = {}
self.df_total = None # 所有文件的最后记录的总数据
self._root = root
self._reg = reg
self._init()

def _init(self):
for dirpath, _, filenames in os.walk(self._root):
for file in filenames:
if re.match(self._reg, file):
path = os.path.join(dirpath, file)
self.handle_file(path)

def get_metrics(self):
metrics = {}
for handler in self.handlers.values():
try:
for metric in handler.get_metrics():
key = (metric.endpoint, metric.metric)
if key in metrics:
old = metrics[key]
if old.timestamp > metric.timestamp:
continue
metrics[key] = metric
except Exception as e:
logging.exception(e)
continue
return list(metrics.values())

def handle_file(self, path):
if path not in self.handlers:
logging.info("开始处理Metric文件: %s" % path)
self.handlers[path] = HandleMetric(path)

def on_created(self, event):
self.handle_file(event.src_path)

def on_modified(self, event):
pass


class SqliteMetricObserver(object):
interval = 10
reg = r"ctaMetric.sqlite" # 监控的文件

def __init__(self, root=".", url=None):
self._root = os.path.abspath(root)
self._observer = Observer()
self._handler = FileEventHandler(self._root, self.reg)
self._observer.schedule(self._handler, self._root, True)
self._url = url or os.environ.get("OPEN_FALCON_URL", "http://localhost:1988/v1/push")

def run(self):
self._observer.start()
try:
while True:
time.sleep(self.interval)
self.push_metrics(self._handler.get_metrics()) # 每5秒push一次
except KeyboardInterrupt:
self._observer.stop()
except Exception as e:
logging.exception(e)
self._observer.join()

def dump_metrics(self, metrics):
payload = [metric.__dict__ for metric in metrics]
return json.dumps(payload, cls=NumpyEncoder)

def push_metrics(self, metrics):
r = requests.post(self._url, data=self.dump_metrics(metrics))
logging.info("推送%s个指标,response:%s" % (len(metrics), r.content))


if __name__ == "__main__":
from utils import run_observer

run_observer(SqliteMetricObserver, ".")
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@

import logging

def run_observer(cls, path, url=None):
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-8s %(message)s")
cls(path, url).run()
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .log import LogfileMetricSender
from .sqlite import SqliteMetricSender
from ..base import set_sender

set_sender(SqliteMetricSender)
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@
from vnpy.trader.utils import Singleton
from vnpy.trader.vtFunction import getTempPath

from .base import set_sender, MetricSender
from ..base import MetricSender

@set_sender
class LogfileMetricSender(with_metaclass(Singleton, MetricSender)):
def __init__(self):
super(LogfileMetricSender, self).__init__()
Expand Down
Loading

0 comments on commit cf3a2ee

Please sign in to comment.