Source code for SatelliteCameraViewer.StarCatalog.catalog

""" StarCatalog """

import os
import time
from pathlib import Path
import hmac
import hashlib
import pickle
import sqlite3
import requests

from .star import Star
from .user_agent import user_agent

# Used by each catalog class - the class only needs to provide a _readstarfile() method

[docs] class Catalog(): """ Catalog() """ # Yeah, yeah, yeah, this "known key" isn't optimal - but it's fit for purpose here. # We are only protecting from corupt file systems - which isn't really a thing anymore. _NOT_REALLY_A_SECRET_KEY = b'c351e1e1-4e9b-45f8-bf74-b08f5f13e9a9' log = None # all star catalogs live here: _DIR_STAR_CATALOG = '~/.cache/star-catalog' base_url = None source_files = None def __init__(self, log, max_mag=None, directory=None, force_reload=False, use_database=False): """ GenericStarCatalog() """ self.__class__.log = log self._name = self.__class__.__name__.replace('_', '-').replace('Catalog','') self._max_mag = max_mag self._force_reload = force_reload self._use_database = use_database if directory: self._directory = directory else: self._directory = os.getenv('STAR_CATALOG') if not self._directory: self._directory = Path(self._DIR_STAR_CATALOG).expanduser() if not os.path.exists(self._directory): raise FileNotFoundError(self._directory) from None if not os.path.exists(self.directory()): os.mkdir(self.directory()) self._key = self._NOT_REALLY_A_SECRET_KEY self._db = None self._stars = None def __len__(self): """ __len__() """ if not self._stars: self._load_stars() return len(self._stars) def __call__(self): """ __call__ """ if not self._stars: self._load_stars() return self.stars() def __str__(self): """ __str__ """ if not self._stars: self._load_stars() return '[Star Catalog %s: length=%d]' % (self.name(), len(self)) def __repr__(self): """ __repr__() """ if not self._stars: self._load_stars() return '[Star Catalog %s: length=%d from %s]' % (self.name(), len(self), self.directory())
[docs] def stars(self): """ stars() """ if not self._stars: self._load_stars() return self._stars
[docs] def name(self): """ name() """ return self._name
[docs] def directory(self): """ directory() """ if not self._directory: raise FileNotFoundError(self._directory) from None return self._directory / self._name
def _star_set(self, v=None): """ _star_set() """ # set/clear the star database if v: self._stars = v else: self._stars = [] def _star_append(self, star): """ _star_append() """ # append to the star database self._stars.append(star) def _load_stars(self): """ _read() """ if not self._force_reload and not self._files_exist(): self._force_reload = True if self._force_reload: _ = self._prime_from_files() # read the stars if self._use_database: if self._force_reload: l = self._database_open(fresh=True) else: l = self._database_open() if l is not None and l > 0: self._database_read() else: _ = self._prime_from_files(dont_pickle=True) self._database_write() else: _ = self._prime_from_files() self.__class__.log.info('%s opened from %s, %s records found', self.name(), self.directory(), format(len(self), ',')) def _prime_from_web(self): """ _prime_from_web() """ if not self.base_url or not self.source_files: raise NotImplementedError from None for filename in self.source_files: url = self.base_url + filename self.__class__.log.debug('start download url=%s, file=%s' % (url, filename)) # The header is set to stop the file be un-gzipped in transfer headers = { 'Accept-Encoding': 'identity', 'User-Agent': user_agent(), } try: response = requests.get(url, headers=headers, stream=True) except Exception as err: self.__class__.log.debug('web requests() failed err=%s' % (err)) continue try: n_bytes = 0 file_path = self.directory() / filename with open(file_path, 'wb') as fd: # this method will stop requests() from un-gzipping the contents! for chunk in response.iter_content(chunk_size=16*1024): fd.write(chunk) n_bytes += len(chunk) except FileNotFoundError: self.__class__.log.debug('file save failed, file=%s' % (file_path)) continue self.__class__.log.debug('download complete, file=%s, len=%d' % (file_path, n_bytes)) def _files_exist(self): """ _files_exist() """ for filename in self.source_files: if not os.path.exists(self.directory() / filename): return False return True def _files_age(self, suffix): """ _files_age() """ filename = (self.directory() / self.name().lower()).with_suffix(suffix) try: age = int(time.time() - os.stat(filename).st_mtime) except: return None return age def _prime_from_files(self, dont_pickle=False): """ _prime_from_files() """ if not self._force_reload: try: self._readpickle() return len(self._stars) except FileNotFoundError: # no worries - we continue stars = None if self._force_reload or not self._files_exist(): self.__class__.log.debug('the very slow path - base star files downloading') self._prime_from_web() self.__class__.log.debug('the slow path - base star files used') # zero out the star database self._star_set(None) # fill the star database try: # _readstarfile will write into _stars n_lines = self._readstarfile(self.directory(), self._max_mag, self._star_append) self.__class__.log.debug('%s from %s with %s records' % (self.name(), self.directory(), n_lines)) except FileNotFoundError as err: self.__class__.log.error('star catalog file: %s' % (self.directory())) return 0 if n_lines == 0 or len(self._stars) == 0: # got nada! self.__class__.log.error('star catalog file: got zero lines, dir=%s' % (self.directory())) return 0 self.__class__.log.debug('%s with %d records found' % (self.name(), n_lines)) # we could do this - but for now, we don't # self._star_set(sorted(self._stars, key=lambda v: (v.mag))) # save it all away for later use - becuase we only get here if there's no pickle file if not dont_pickle: self._writepickle() return len(self._stars) def _readstarfile(self, directory, max_mag, star_append): """ _readstarfile() """ # this is expected to be implemented by the catalog-specific code raise NotImplementedError from None def _readpickle(self): """ _readpickle() """ filename = (self.directory() / self.name().lower()).with_suffix('.pickle') # read in the pickle file try: with open(filename, 'rb') as fd: stars_b = fd.read() except FileNotFoundError as err: raise FileNotFoundError from err # check digest and only return stars if correct signature1 = hmac.new(self._key, stars_b, hashlib.sha256).hexdigest() filename = (self.directory() / self.name().lower()).with_suffix('.sha256') try: with open(filename, 'r', encoding='utf-8') as fd: signature2 = fd.read().rstrip() except FileNotFoundError: self.__class__.log.error('sig file:', type(err).__name__, err, self.directory()) raise FileNotFoundError from err if not hmac.compare_digest(signature1, signature2): # Danger Will Robinson - ignore it all! self.__class__.log.error('sig file mismatch: signature %s vs %s' % (signature1, signature2)) raise ValueError('sig file mismatch') from None # We have a correct sig - lets proceed! stars = pickle.loads(stars_b) # yippee, we can use the saved away data self.__class__.log.debug('yippee - pickle file used') self._star_set(stars) def _writepickle(self): """ _writepickle() """ stars_b = pickle.dumps(self._stars) # write the pickle file filename = (self.directory() / self.name().lower()).with_suffix('.pickle') try: with open(filename, 'wb') as fd: fd.write(stars_b) except FileNotFoundError: return # write digest based on stars filename = (self.directory() / self.name().lower()).with_suffix('.sha256') signature = hmac.new(self._key, stars_b, hashlib.sha256).hexdigest() with open(filename, 'w', encoding='utf-8') as fd: fd.write(signature) fd.write('\n') self.__class__.log.debug('signature written = %s' % (signature)) def _database_open(self, memory=False, shared=False, fresh=False): """ _database_open() """ if self._db: # we've already returned a count on the previous try - let's not do it again return None if memory: if shared: filename = ':memory:?cache=shared' else: filename = ':memory:' else: filename = (self.directory() / self.name().lower()).with_suffix('.db') if not os.path.exists(filename): fresh = True try: self._db = sqlite3.connect(filename) except sqlite3.OperationalError as err: self.__class__.log.error('db file:', type(err).__name__, err, self.directory()) pass cur = self._db.cursor() if (memory and not shared) or fresh: cur.execute('DROP TABLE IF EXISTS stars') cur.execute('CREATE TABLE IF NOT EXISTS stars(number INTEGER, name_star TEXT, name_constellation TEXT, ra REAL, dec REAL, mag REAL)') self._db.commit() if fresh: return 0 cur = self._db.cursor() cur.execute('SELECT COUNT(*) FROM stars') l = cur.fetchone()[0] self.__class__.log.info('database %s opened from %s, %s records found', self.name(), self.directory(), format(l, ',')) return l def _database_read(self): """ _database_read() """ self._star_set(None) cur = self._db.cursor() for row in cur.execute('SELECT * FROM stars'): # should deal with quotes here self._star_append(Star(*row)) def _database_write(self): """ _database_write() """ cur = self._db.cursor() data = [] for star in self.stars(): s = star() if s[0] and not isinstance(s[0], int): s = (s[0][1], s[1], s[2], s[3], s[4], s[5]) data.append(s) cur.executemany('INSERT INTO stars VALUES(?, ?, ?, ?, ?, ?)', data) self._db.commit() self.__class__.log.info('database insert %s records', format(len(data), ',')) def _database_truncate(self): """ _database_truncate() """ cur = self._db.cursor() #cur.execute('DELETE FROM stars') cur.execute('TRUNCATE TABLE stars') self._db.commit() self.__class__.log.info('database truncate') def _database_dump(self): """ _database_dump() """ if not self._use_database: return cur = self._db.cursor() for row in cur.execute('SELECT * FROM stars ORDER BY mag LIMIT 20'): print('%s' % (Star(*row))) self._db.commit()