Skip to content

Module Information

Description: DataManager class for QuantJourney Framework

The DataManager class serves as the central hub for data storage and retrieval within the QuantJourney Framework. It is engineered to interface with multiple types of databases and storage systems, offering a flexible and powerful way to manage financial data. By leveraging configuration settings, the DataManager dynamically connects to various data storage solutions, making it an essential component for quantitative analysis in hedge fund operations.

Features:

Versatile storage system integration, supporting MongoDB, Arctic, S3, and kdb+, among others, to cater to diverse data storage needs.

Configuration-driven connectivity, allowing for seamless transitions between different storage backends without altering the codebase.

Advanced data handling capabilities, including support for time series data, ensuring optimal performance for both historical analysis and real-time data processing.

Comprehensive error management framework, providing detailed insights into connectivity and data retrieval issues to maintain high levels of data integrity and availability.

Streamlined data access methods, facilitating efficient data queries and manipulation across various storage platforms with minimal overhead.

Author: jpolec

Date: 27-02-2024 and 18-03-2024

Class: DataManager:()

Method: load_configuration

def load_configuration(
            config_path: str
        ) -> None

Loads storage configuration from a JSON file. Parameters:

Name Type Description
config_path str Path to the configuration file.

Method: initialize_storage

def initialize_storage() -> None

Initializes the storage system based on the configuration.

Method: is_shutting_down

def is_shutting_down() -> bool

Attempt to infer if the interpreter is shutting down by checking if dummy threading operations fail, indicating cleanup has started. This is a heuristic and might not be 100% reliable across all Python versions.

Returns:

Type Description
bool True if the interpreter is shutting down; False otherwise.

Method: close

def close() -> None

Ensure all resources are properly closed.

Method: initialize_kdb

def initialize_kdb() -> None

Set up the connection to kdb+

Method: initialize_s3

def initialize_s3() -> None

Set up the connection to AWS S3

Method: initialize_arctic

def initialize_arctic() -> None

Set up the connection to ArcticDB

Method: initialize_mongo

def initialize_mongo() -> None

Set up the connection to MongoDB.

This method initializes the MongoDB connection using credentials and other configurations provided in the class's configuration data. It validates the presence of necessary configuration parameters and establishes a connection to the specified MongoDB cluster and database.

Method: initialize_redis

def initialize_redis() -> None

Set up the connection to Redis.

Method: ensure_collection_exists

def ensure_collection_exists(
            collection_name: str
        ) -> bool

Ensure the collection exists in the database. Parameters:

Name Type Description
collection_name str The name of the collection to check.

Returns:

Type Description
bool True if the collection exists; False otherwise.

Method: read_data

def read_data(
            **kwargs
        ) -> list

Read data from the storage based on the storage type, using flexible arguments.

Examples:

# Read data from MongoDB
data = await data_manager.read_data(storage_type='mongo', collection_name='my_collection', query={'symbol': 'AAPL'})

Parameters:

Name Type Description
**kwargs None Flexible keyword arguments based on the storage type.

Returns:

Type Description
list A list of data records read from the storage.

Method: read_timeseries_data

def read_timeseries_data(
            **kwargs
        ) -> list

Read timeseries data from the storage based on the storage type, using flexible arguments.

Examples:

# Read timeseries data from MongoDB
data = await data_manager.read_timeseries_data(storage_type='mongo', collection='my_collection', metadata={'symbol': 'AAPL'}, period_start='2022-01-01', period_end='2022-12-31')

Parameters:

Name Type Description
**kwargs None Flexible keyword arguments based on the storage type.

Returns:

Type Description
list A list of timeseries data records read from the storage.

Method: read_fundamental_data

def read_fundamental_data(
            **kwargs
        ) -> list

Read fundamental data from the storage based on the storage type, using flexible arguments.

Examples:

# Read fundamental data from MongoDB
data = await data_manager.read_fundamental_data(storage_type='mongo', collection='my_collection', query={'symbol': 'AAPL'})

Parameters:

Name Type Description
**kwargs None Flexible keyword arguments based on the storage type.

Returns:

Type Description
list A list of fundamental data records read from the storage.

Method: write_data

def write_data(
            **kwargs
        ) -> None

Write data to the storage based on the storage type, using flexible arguments.

Examples:

# Write data to MongoDB
await data_manager.write_data(storage_type='mongo', collection_name='my_collection', symbol='AAPL', data=[{...}, {...}])

Parameters:

Name Type Description
**kwargs None Flexible keyword arguments based on the storage type.

Method: write_timeseries_data

def write_timeseries_data(
            **kwargs
        ) -> None

Write timeseries data to the storage based on the storage type, using flexible arguments.

Examples:

# Write timeseries data to MongoDB
await data_manager.write_timeseries_data(storage_type='mongo', collection='my_collection', symbol='AAPL', data={...})

Parameters:

Name Type Description
**kwargs None Flexible keyword arguments based on the storage type.

Method: write_fundamental_data

def write_fundamental_data(
            **kwargs
        ) -> None

Write fundamental data to the storage based on the storage type, using flexible arguments.

Examples:

# Write fundamental data to MongoDB
await data_manager.write_fundamental_data(storage_type='mongo', collection='my_collection', symbol='AAPL', data={...})

Parameters:

Name Type Description
**kwargs None Flexible keyword arguments based on the storage type.

Method: read_data_from_mongo

def read_data_from_mongo(
            **kwargs
        ) -> list

Read data from MongoDB using flexible arguments. Parameters:

Name Type Description
**kwargs None Flexible keyword arguments for MongoDB.

Returns:

Type Description
list A list of data records read from MongoDB.

Method: read_timeseries_data_from_mongo

def read_timeseries_data_from_mongo(
            **kwargs
        ) -> list

Read timeseries data from MongoDB using flexible arguments, including metadata for filtering and checking against specified date ranges. Parameters:

Name Type Description
**kwargs None Flexible keyword arguments for MongoDB.

Returns:

Type Description
list A list of timeseries data records read from MongoDB.

Method: write_data_to_mongo

def write_data_to_mongo(
            **kwargs
        ) -> None

Write data to MongoDB using flexible arguments. Parameters:

Name Type Description
**kwargs None Flexible keyword arguments for MongoDB.

Method: write_timeseries_metadata_to_mongo

def write_timeseries_metadata_to_mongo(
            collection_name,
            period_start,
            period_end,
            **metadata
        ) -> bool

Write timeseries metadata to MongoDB, combining and updating segments as needed. Parameters:

Name Type Description
collection_name str The name of the collection to write metadata to.
period_start str The start date of the period to write metadata for.
period_end str The end date of the period to write metadata for.
**metadata None Flexible keyword arguments for metadata.

Returns:

Type Description
bool True if the metadata was written successfully; False otherwise.

Method: write_timeseries_data_to_mongo

def write_timeseries_data_to_mongo(
            **kwargs
        ) -> None

Write timeseries data to MongoDB using flexible arguments. Parameters:

Name Type Description
**kwargs None Flexible keyword arguments for MongoDB.

Returns:

Type Description
None None

Method: write_fundamental_data_to_mongo

def write_fundamental_data_to_mongo(
            **kwargs
        ) -> None

Write fundamental data to MongoDB using flexible arguments. Parameters:

Name Type Description
**kwargs None Flexible keyword arguments for MongoDB.

Method: read_fundamental_data_from_mongo

def read_fundamental_data_from_mongo(
            **kwargs
        )

Read fundamental data from MongoDB using flexible arguments, including metadata for filtering. Parameters:

Name Type Description
**kwargs None Flexible keyword arguments for MongoDB.

Method: read_data_from_redis

def read_data_from_redis(
            **kwargs
        ) -> list

Read data from Redis using flexible arguments. Parameters:

Name Type Description
**kwargs None Flexible keyword arguments for Redis.

Returns:

Type Description
list A list of data records read from Redis.

Method: write_data_to_redis

def write_data_to_redis(
            **kwargs
        ) -> None

Write data to Redis using flexible arguments. Parameters:

Name Type Description
**kwargs None Flexible keyword arguments for Redis.

Method: read_timeseries_data_from_redis

def read_timeseries_data_from_redis(
            **kwargs
        )

Read timeseries data from Redis using flexible arguments. Parameters:

Name Type Description
**kwargs None Flexible keyword arguments for Redis.

Method: write_timeseries_data_to_redis

def write_timeseries_data_to_redis(
            **kwargs
        )

Write timeseries data to Redis using flexible arguments. Parameters:

Name Type Description
**kwargs None Flexible keyword arguments for Redis.

Method: write_metadata_to_redis

def write_metadata_to_redis(
            **kwargs
        ) -> None

Write metadata to Redis using flexible arguments. Parameters:

Name Type Description
**kwargs None Flexible keyword arguments for Redis.

Method: read_metadata_from_redis

def read_metadata_from_redis(
            **kwargs
        ) -> dict

Read metadata from Redis using flexible arguments. Parameters:

Name Type Description
**kwargs None Flexible keyword arguments for Redis.

Returns:

Type Description
dict A dictionary containing the metadata read from Redis.

Method: read_data_from_kdb

def read_data_from_kdb(
            **kwargs
        ) -> list

Read data from kdb+ using flexible arguments. Parameters:

Name Type Description
**kwargs None Flexible keyword arguments for kdb+.

Returns:

Type Description
list A list of data records read from kdb+.

Method: write_data_to_kdb

def write_data_to_kdb(
            **kwargs
        ) -> None

Store data in kdb+ using flexible arguments. Parameters:

Name Type Description
**kwargs None Flexible keyword arguments for kdb+.

Method: read_data_from_arctic

def read_data_from_arctic(
            **kwargs
        ) -> list

Read data from the database (ArcticDB) using flexible arguments. Parameters:

Name Type Description
**kwargs None Flexible keyword arguments for the database.

Returns:

Type Description
list A list of data records read from the database.

Method: write_data_to_arctic

def write_data_to_arctic(
            **kwargs
        ) -> None

Write data to the Arctic library using flexible arguments. Parameters:

Name Type Description
**kwargs None Flexible keyword arguments for the Arctic library.

Method: read_data_from_s3

def read_data_from_s3(
            **kwargs
        ) -> list

Read data from AWS S3 using flexible arguments. Parameters:

Name Type Description
**kwargs None Flexible keyword arguments for AWS S3.

Returns:

Type Description
list A list of data records read from AWS S3.

Method: write_data_to_s3

def write_data_to_s3(
            **kwargs
        ) -> None

Write data to AWS S3 using flexible arguments. Parameters:

Name Type Description
**kwargs None Flexible keyword arguments for AWS S3.