Source code for pancloud.adapters.tinydb_adapter

# -*- coding: utf-8 -*-

"""TinyDB storage adapter."""
from __future__ import absolute_import

import os
from threading import RLock

from tinydb import TinyDB, Query

from .. import PanCloudError
from . import StorageAdapter


[docs]class TinyDBStore(StorageAdapter): def __init__(self, **kwargs): self._storage_params = kwargs.get('storage_params') or {} self.dbfile = self._storage_params.get('dbfile') self.query = Query() self.db = self.init_store() self.lock = RLock()
[docs] def fetch_credential(self, credential=None, profile=None): """Fetch credential from credentials file. Args: credential (str): Credential to fetch. profile (str): Credentials profile. Defaults to ``'default'``. Returns: str, None: Fetched credential or ``None``. """ q = self.db.get(self.query.profile == profile) if q is not None: return q.get(credential)
[docs] def init_store(self): if self.dbfile: dbfile = self.dbfile elif os.getenv('PAN_CREDENTIALS_DBFILE'): dbfile = os.getenv('PAN_CREDENTIALS_DBFILE') else: dbfile = os.path.join( os.path.expanduser('~'), '.config', 'pancloud', 'credentials.json' ) if not os.path.exists(os.path.dirname(dbfile)): try: os.makedirs(os.path.dirname(dbfile), 0o700) except OSError as e: raise PanCloudError("{}".format(e)) return TinyDB( dbfile, sort_keys=True, indent=4, default_table='profiles' )
[docs] def remove_profile(self, profile=None): """Remove profile from credentials file. Args: profile (str): Credentials profile to remove. Returns: list: List of affected document IDs. """ with self.db: return self.db.remove(self.query.profile == profile)
[docs] def write_credentials(self, credentials=None, profile=None, cache_token=None): """Write credentials. Write credentials to credentials file. Performs ``upsert``. Args: cache_token (bool): If ``True``, stores ``access_token`` in token store. Defaults to ``True``. credentials (class): Read-only credentials. profile (str): Credentials profile. Defaults to ``'default'``. Returns: int: Affected document ID. """ d = { 'profile': profile, 'client_id': credentials.client_id, 'client_secret': credentials.client_secret, 'refresh_token': credentials.refresh_token } if cache_token: d.update({'access_token': credentials.access_token}) with self.lock: return self.db.upsert( d, self.query.profile == profile )