Module Information
Description: DataManager class for QuantJourney Framework
The DataManager class serves as the central hub for data storage and retrieval within the QuantJourney Framework. It is engineered to interface with multiple types of databases and storage systems, offering a flexible and powerful way to manage financial data. By leveraging configuration settings, the DataManager dynamically connects to various data storage solutions, making it an essential component for quantitative analysis in hedge fund operations.
Features:
Versatile storage system integration, supporting MongoDB, Arctic, S3, and kdb+, among others, to cater to diverse data storage needs.
Configuration-driven connectivity, allowing for seamless transitions between different storage backends without altering the codebase.
Advanced data handling capabilities, including support for time series data, ensuring optimal performance for both historical analysis and real-time data processing.
Comprehensive error management framework, providing detailed insights into connectivity and data retrieval issues to maintain high levels of data integrity and availability.
Streamlined data access methods, facilitating efficient data queries and manipulation across various storage platforms with minimal overhead.
Author: jpolec
Date: 27-02-2024 and 18-03-2024
Class: DataManager:()
Method: load_configuration
def load_configuration(
config_path: str
) -> None
Loads storage configuration from a JSON file. Parameters:
Name | Type | Description |
---|---|---|
config_path |
str |
Path to the configuration file. |
Method: initialize_storage
def initialize_storage() -> None
Initializes the storage system based on the configuration.
Method: is_shutting_down
def is_shutting_down() -> bool
Attempt to infer if the interpreter is shutting down by checking if dummy threading operations fail, indicating cleanup has started. This is a heuristic and might not be 100% reliable across all Python versions.
Returns:
Type | Description |
---|---|
bool |
True if the interpreter is shutting down; False otherwise. |
Method: close
def close() -> None
Ensure all resources are properly closed.
Method: initialize_kdb
def initialize_kdb() -> None
Set up the connection to kdb+
Method: initialize_s3
def initialize_s3() -> None
Set up the connection to AWS S3
Method: initialize_arctic
def initialize_arctic() -> None
Set up the connection to ArcticDB
Method: initialize_mongo
def initialize_mongo() -> None
Set up the connection to MongoDB.
This method initializes the MongoDB connection using credentials and other configurations provided in the class's configuration data. It validates the presence of necessary configuration parameters and establishes a connection to the specified MongoDB cluster and database.
Method: initialize_redis
def initialize_redis() -> None
Set up the connection to Redis.
Method: ensure_collection_exists
def ensure_collection_exists(
collection_name: str
) -> bool
Ensure the collection exists in the database. Parameters:
Name | Type | Description |
---|---|---|
collection_name |
str |
The name of the collection to check. |
Returns:
Type | Description |
---|---|
bool |
True if the collection exists; False otherwise. |
Method: read_data
def read_data(
**kwargs
) -> list
Read data from the storage based on the storage type, using flexible arguments.
Examples:
# Read data from MongoDB
data = await data_manager.read_data(storage_type='mongo', collection_name='my_collection', query={'symbol': 'AAPL'})
Parameters:
Name | Type | Description |
---|---|---|
**kwargs |
None |
Flexible keyword arguments based on the storage type. |
Returns:
Type | Description |
---|---|
list |
A list of data records read from the storage. |
Method: read_timeseries_data
def read_timeseries_data(
**kwargs
) -> list
Read timeseries data from the storage based on the storage type, using flexible arguments.
Examples:
# Read timeseries data from MongoDB
data = await data_manager.read_timeseries_data(storage_type='mongo', collection='my_collection', metadata={'symbol': 'AAPL'}, period_start='2022-01-01', period_end='2022-12-31')
Parameters:
Name | Type | Description |
---|---|---|
**kwargs |
None |
Flexible keyword arguments based on the storage type. |
Returns:
Type | Description |
---|---|
list |
A list of timeseries data records read from the storage. |
Method: read_fundamental_data
def read_fundamental_data(
**kwargs
) -> list
Read fundamental data from the storage based on the storage type, using flexible arguments.
Examples:
# Read fundamental data from MongoDB
data = await data_manager.read_fundamental_data(storage_type='mongo', collection='my_collection', query={'symbol': 'AAPL'})
Parameters:
Name | Type | Description |
---|---|---|
**kwargs |
None |
Flexible keyword arguments based on the storage type. |
Returns:
Type | Description |
---|---|
list |
A list of fundamental data records read from the storage. |
Method: write_data
def write_data(
**kwargs
) -> None
Write data to the storage based on the storage type, using flexible arguments.
Examples:
# Write data to MongoDB
await data_manager.write_data(storage_type='mongo', collection_name='my_collection', symbol='AAPL', data=[{...}, {...}])
Parameters:
Name | Type | Description |
---|---|---|
**kwargs |
None |
Flexible keyword arguments based on the storage type. |
Method: write_timeseries_data
def write_timeseries_data(
**kwargs
) -> None
Write timeseries data to the storage based on the storage type, using flexible arguments.
Examples:
# Write timeseries data to MongoDB
await data_manager.write_timeseries_data(storage_type='mongo', collection='my_collection', symbol='AAPL', data={...})
Parameters:
Name | Type | Description |
---|---|---|
**kwargs |
None |
Flexible keyword arguments based on the storage type. |
Method: write_fundamental_data
def write_fundamental_data(
**kwargs
) -> None
Write fundamental data to the storage based on the storage type, using flexible arguments.
Examples:
# Write fundamental data to MongoDB
await data_manager.write_fundamental_data(storage_type='mongo', collection='my_collection', symbol='AAPL', data={...})
Parameters:
Name | Type | Description |
---|---|---|
**kwargs |
None |
Flexible keyword arguments based on the storage type. |
Method: read_data_from_mongo
def read_data_from_mongo(
**kwargs
) -> list
Read data from MongoDB using flexible arguments. Parameters:
Name | Type | Description |
---|---|---|
**kwargs |
None |
Flexible keyword arguments for MongoDB. |
Returns:
Type | Description |
---|---|
list |
A list of data records read from MongoDB. |
Method: read_timeseries_data_from_mongo
def read_timeseries_data_from_mongo(
**kwargs
) -> list
Read timeseries data from MongoDB using flexible arguments, including metadata for filtering and checking against specified date ranges. Parameters:
Name | Type | Description |
---|---|---|
**kwargs |
None |
Flexible keyword arguments for MongoDB. |
Returns:
Type | Description |
---|---|
list |
A list of timeseries data records read from MongoDB. |
Method: write_data_to_mongo
def write_data_to_mongo(
**kwargs
) -> None
Write data to MongoDB using flexible arguments. Parameters:
Name | Type | Description |
---|---|---|
**kwargs |
None |
Flexible keyword arguments for MongoDB. |
Method: write_timeseries_metadata_to_mongo
def write_timeseries_metadata_to_mongo(
collection_name,
period_start,
period_end,
**metadata
) -> bool
Write timeseries metadata to MongoDB, combining and updating segments as needed. Parameters:
Name | Type | Description |
---|---|---|
collection_name |
str |
The name of the collection to write metadata to. |
period_start |
str |
The start date of the period to write metadata for. |
period_end |
str |
The end date of the period to write metadata for. |
**metadata |
None |
Flexible keyword arguments for metadata. |
Returns:
Type | Description |
---|---|
bool |
True if the metadata was written successfully; False otherwise. |
Method: write_timeseries_data_to_mongo
def write_timeseries_data_to_mongo(
**kwargs
) -> None
Write timeseries data to MongoDB using flexible arguments. Parameters:
Name | Type | Description |
---|---|---|
**kwargs |
None |
Flexible keyword arguments for MongoDB. |
Returns:
Type | Description |
---|---|
None |
None |
Method: write_fundamental_data_to_mongo
def write_fundamental_data_to_mongo(
**kwargs
) -> None
Write fundamental data to MongoDB using flexible arguments. Parameters:
Name | Type | Description |
---|---|---|
**kwargs |
None |
Flexible keyword arguments for MongoDB. |
Method: read_fundamental_data_from_mongo
def read_fundamental_data_from_mongo(
**kwargs
)
Read fundamental data from MongoDB using flexible arguments, including metadata for filtering. Parameters:
Name | Type | Description |
---|---|---|
**kwargs |
None |
Flexible keyword arguments for MongoDB. |
Method: read_data_from_redis
def read_data_from_redis(
**kwargs
) -> list
Read data from Redis using flexible arguments. Parameters:
Name | Type | Description |
---|---|---|
**kwargs |
None |
Flexible keyword arguments for Redis. |
Returns:
Type | Description |
---|---|
list |
A list of data records read from Redis. |
Method: write_data_to_redis
def write_data_to_redis(
**kwargs
) -> None
Write data to Redis using flexible arguments. Parameters:
Name | Type | Description |
---|---|---|
**kwargs |
None |
Flexible keyword arguments for Redis. |
Method: read_timeseries_data_from_redis
def read_timeseries_data_from_redis(
**kwargs
)
Read timeseries data from Redis using flexible arguments. Parameters:
Name | Type | Description |
---|---|---|
**kwargs |
None |
Flexible keyword arguments for Redis. |
Method: write_timeseries_data_to_redis
def write_timeseries_data_to_redis(
**kwargs
)
Write timeseries data to Redis using flexible arguments. Parameters:
Name | Type | Description |
---|---|---|
**kwargs |
None |
Flexible keyword arguments for Redis. |
Method: write_metadata_to_redis
def write_metadata_to_redis(
**kwargs
) -> None
Write metadata to Redis using flexible arguments. Parameters:
Name | Type | Description |
---|---|---|
**kwargs |
None |
Flexible keyword arguments for Redis. |
Method: read_metadata_from_redis
def read_metadata_from_redis(
**kwargs
) -> dict
Read metadata from Redis using flexible arguments. Parameters:
Name | Type | Description |
---|---|---|
**kwargs |
None |
Flexible keyword arguments for Redis. |
Returns:
Type | Description |
---|---|
dict |
A dictionary containing the metadata read from Redis. |
Method: read_data_from_kdb
def read_data_from_kdb(
**kwargs
) -> list
Read data from kdb+ using flexible arguments. Parameters:
Name | Type | Description |
---|---|---|
**kwargs |
None |
Flexible keyword arguments for kdb+. |
Returns:
Type | Description |
---|---|
list |
A list of data records read from kdb+. |
Method: write_data_to_kdb
def write_data_to_kdb(
**kwargs
) -> None
Store data in kdb+ using flexible arguments. Parameters:
Name | Type | Description |
---|---|---|
**kwargs |
None |
Flexible keyword arguments for kdb+. |
Method: read_data_from_arctic
def read_data_from_arctic(
**kwargs
) -> list
Read data from the database (ArcticDB) using flexible arguments. Parameters:
Name | Type | Description |
---|---|---|
**kwargs |
None |
Flexible keyword arguments for the database. |
Returns:
Type | Description |
---|---|
list |
A list of data records read from the database. |
Method: write_data_to_arctic
def write_data_to_arctic(
**kwargs
) -> None
Write data to the Arctic library using flexible arguments. Parameters:
Name | Type | Description |
---|---|---|
**kwargs |
None |
Flexible keyword arguments for the Arctic library. |
Method: read_data_from_s3
def read_data_from_s3(
**kwargs
) -> list
Read data from AWS S3 using flexible arguments. Parameters:
Name | Type | Description |
---|---|---|
**kwargs |
None |
Flexible keyword arguments for AWS S3. |
Returns:
Type | Description |
---|---|
list |
A list of data records read from AWS S3. |
Method: write_data_to_s3
def write_data_to_s3(
**kwargs
) -> None
Write data to AWS S3 using flexible arguments. Parameters:
Name | Type | Description |
---|---|---|
**kwargs |
None |
Flexible keyword arguments for AWS S3. |