Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rqsdk 768 #908

Merged
merged 4 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rqalpha/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ class EXCHANGE(CustomEnum):
CZCE = "CZCE"
CFFEX = "CFFEX"
SGEX = "SGEX"
BJSE = "BJSE"


# noinspection PyPep8Naming
Expand Down
2 changes: 0 additions & 2 deletions rqalpha/data/base_data_source/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ def _p(name):
instruments.append(Instrument(
i,
lambda i: self._future_info_store.get_tick_size(i),
# lambda i, dt: env.data_proxy.get_futures_trading_parameters(i, dt).long_margin_ratio,
# lambda i, dt: env.data_proxy.get_futures_trading_parameters(i, dt).short_margin_ratio
))
for ins_type in self.DEFAULT_INS_TYPES:
self.register_instruments_store(InstrumentStore(instruments, ins_type))
Expand Down
16 changes: 8 additions & 8 deletions rqalpha/data/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,29 +419,26 @@ def __call__(self, path, fields, **kwargs):
h5.close()


def process_init(args: Optional[Synchronized] = None):
def process_init(args: Optional[Synchronized] = None, kwargs = None):
kwargs = kwargs or {}
import warnings
with warnings.catch_warnings(record=True):
# catch warning: rqdatac is already inited. Settings will be changed
rqdatac.init()
rqdatac.init(**kwargs)
init_logger()
# Initialize process shared variables
if args:
global sval
sval = args


def update_bundle(path, create, enable_compression=False, concurrency=1):
def update_bundle(path, create, enable_compression=False, concurrency=1, **kwargs):
if create:
_DayBarTask = GenerateDayBarTask
else:
_DayBarTask = UpdateDayBarTask

init_logger()
kwargs = {}
if enable_compression:
kwargs['compression'] = 9

day_bar_args = (
("stocks.h5", rqdatac.all_instruments('CS').order_book_id.tolist(), STOCK_FIELDS),
("indexes.h5", rqdatac.all_instruments('INDX').order_book_id.tolist(), INDEX_FIELDS),
Expand All @@ -458,8 +455,11 @@ def update_bundle(path, create, enable_compression=False, concurrency=1):

succeed = multiprocessing.Value(c_bool, True)
with ProgressedProcessPoolExecutor(
max_workers=concurrency, initializer=process_init, initargs=(succeed, )
max_workers=concurrency, initializer=process_init, initargs=(succeed, kwargs)
) as executor:
kwargs = {}
if enable_compression:
kwargs['compression'] = 9
# windows上子进程需要执行rqdatac.init, 其他os则需要执行rqdatac.reset; rqdatac.init包含了rqdatac.reset的功能
for func in gen_file_funcs:
executor.submit(GenerateFileTask(func), path)
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

[metadata]
name = rqalpha
version = 5.4.4
version = 5.5.0

[versioneer]
VCS = git
Expand Down
Loading