Source code for mincepy.archive_factory
import logging
import pymongo
import pymongo.uri_parser
from . import historians
from . import plugins
__all__ = 'create_archive', 'create_historian'
logger = logging.getLogger(__name__) # pylint: disable=invalid-name
[docs]def create_archive(uri: str):
"""Create an archive type based on a uri string"""
archive = None
if uri.startswith('mongodb'):
from . import mongo # pylint: disable=import-outside-toplevel
# Format is:
# mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[database][?options]]
parsed = pymongo.uri_parser.parse_uri(uri)
if not parsed.get('database', None):
raise ValueError("Failed to supply database on MongoDB uri: {}".format(uri))
client = pymongo.MongoClient(uri)
database = client[parsed['database']]
archive = mongo.MongoArchive(database)
if archive is None:
raise ValueError("Unknown archive string: {}".format(uri))
logger.info('Connected to archive with uri: %s', uri)
return archive
[docs]def create_historian(archive_uri: str, apply_plugins=True) -> historians.Historian:
"""Convenience function to create a standard historian directly from an archive URI"""
historian = historians.Historian(create_archive(archive_uri))
if apply_plugins:
historian.register_types(plugins.get_types())
return historian