Source code for yatsm.cache

""" Functions related to writing to and retrieving from cache files
"""
import os

import numpy as np

from .log_yatsm import logger

_image_ID_str = 'image_IDs'


[docs]def get_line_cache_name(dataset_config, n_images, row, nbands): """ Returns cache filename for specified config and line number Args: dataset_config (dict): configuration information about the dataset n_images (int): number of images in dataset row (int): line of the dataset for output nbands (int): number of bands in dataset Returns: str: filename of cache file """ path = dataset_config.get('cache_line_dir') if not path: return filename = 'yatsm_r%i_n%i_b%i.npy.npz' % (row, n_images, nbands) return os.path.join(path, filename)
[docs]def get_line_cache_pattern(row, nbands, regex=False): """ Returns a pattern for a cache file from a certain row This function is useful for finding all cache files from a line, ignoring the number of images in the file. Args: row (int): line of the dataset for output nbands (int): number of bands in dataset regex (bool, optional): return a regular expression instead of glob style (default: False) Returns: str: filename pattern for cache files from line ``row`` """ wildcard = '.*' if regex else '*' pattern = 'yatsm_r{l}_n{w}_b{b}.npy.npz'.format( l=row, w=wildcard, b=nbands) return pattern
[docs]def test_cache(dataset_config): """ Test cache directory for ability to read from or write to Args: dataset_config (dict): dictionary of dataset configuration options Returns: tuple: tuple of bools describing ability to read from and write to cache directory """ # Try to find / use cache read_cache = False write_cache = False cache_dir = dataset_config.get('cache_line_dir') if cache_dir: # Test existence if os.path.isdir(cache_dir): if os.access(cache_dir, os.R_OK): read_cache = True if os.access(cache_dir, os.W_OK): write_cache = True if read_cache and not write_cache: logger.warning('Cache directory exists but is not writable') else: # If it doesn't already exist, can we create it? try: os.makedirs(cache_dir) except: logger.warning('Could not create cache directory') else: read_cache = True write_cache = True logger.debug('Attempt reading in from cache directory?: {b}'.format( b=read_cache)) logger.debug('Attempt writing to cache directory?: {b}'.format( b=write_cache)) return read_cache, write_cache
[docs]def read_cache_file(cache_filename, image_IDs=None): """ Returns image data from a cache file If ``image_IDs`` is not None this function will try to ensure data from cache file come from the list of image IDs provided. If cache file does not contain a list of image IDs, it will skip the check and return cache data. Args: cache_filename (str): cache filename image_IDs (iterable, optional): list of image IDs corresponding to data in cache file. If not specified, function will not check for correspondence (default: None) Returns: np.ndarray, or None: Return Y as np.ndarray if possible and if the cache file passes the consistency check specified by ``image_IDs``, else None """ try: cache = np.load(cache_filename) except IOError: return None if _image_ID_str in cache.files and image_IDs is not None: if not np.array_equal(image_IDs, cache[_image_ID_str]): logger.warning('Cache file data in {f} do not match images ' 'specified'.format(f=cache_filename)) return None return cache['Y']
[docs]def write_cache_file(cache_filename, Y, image_IDs): """ Writes data to a cache file using np.savez_compressed Args: cache_filename (str): cache filename Y (np.ndarray): data to write to cache file image_IDs (iterable): list of image IDs corresponding to data in cache file. If not specified, function will not check for correspondence """ np.savez_compressed(cache_filename, **{ 'Y': Y, _image_ID_str: image_IDs })
# Cache file updating
[docs]def update_cache_file(images, image_IDs, old_cache_filename, new_cache_filename, line, reader): """ Modify an existing cache file to contain data within `images` This should be useful for updating a set of cache files to reflect modifications to the timeseries dataset without completely reading the data into another cache file. For example, the cache file could be updated to reflect the deletion of a misregistered or cloudy image. Another common example would be for updating cache files to include newly acquired observations. Note that this updater will not handle updating cache files to include new bands. Args: images (iterable): list of new image filenames image_IDs (iterable): list of new image identifying strings old_cache_filename (str): filename of cache file to update new_cache_filename (str): filename of new cache file which includes modified data line (int): the line of data to be updated reader (callable): GDAL or BIP image reader function from :mod:`yatsm.io.stack_line_readers` Raises: ValueError: Raise error if old cache file does not record ``image_IDs`` """ images = np.asarray(images) image_IDs = np.asarray(image_IDs) # Cannot proceed if old cache file doesn't store filenames old_cache = np.load(old_cache_filename) if _image_ID_str not in old_cache.files: raise ValueError('Cannot update cache.' 'Old cache file does not store image IDs.') old_IDs = old_cache[_image_ID_str] old_Y = old_cache['Y'] nband, _, ncol = old_Y.shape # Create new Y and add in values retained from old cache new_Y = np.zeros((nband, image_IDs.size, ncol), dtype=old_Y.dtype.type) new_IDs = np.zeros(image_IDs.size, dtype=image_IDs.dtype) # Check deletions -- find which indices to retain in new cache retain_old = np.where(np.in1d(old_IDs, image_IDs))[0] if retain_old.size == 0: logger.warning('No image IDs in common in old cache file.') else: logger.debug(' retaining {r} of {n} images'.format( r=retain_old.size, n=old_IDs.size)) # Find indices of old data to insert into new data idx_old_IDs = np.argsort(old_IDs) sorted_old_IDs = old_IDs[idx_old_IDs] idx_IDs = np.searchsorted(sorted_old_IDs, image_IDs[np.in1d(image_IDs, old_IDs)]) retain_old = idx_old_IDs[idx_IDs] # Indices to insert into new data retain_new = np.where(np.in1d(image_IDs, old_IDs))[0] new_Y[:, retain_new, :] = old_Y[:, retain_old, :] new_IDs[retain_new] = old_IDs[retain_old] # Check additions -- find which indices we need to insert insert = np.where(np.in1d(image_IDs, old_IDs, invert=True))[0] if retain_old.size == 0 and insert.size == 0: raise ValueError('Cannot update cache file -- ' 'no data retained or added') # Read in the remaining data from disk if insert.size > 0: logger.debug('Inserting {n} new images into cache'.format( n=insert.size)) insert_Y = reader.read_row(images[insert], line) new_Y[:, insert, :] = insert_Y new_IDs[insert] = image_IDs[insert] np.testing.assert_equal(new_IDs, image_IDs) # Save write_cache_file(new_cache_filename, new_Y, image_IDs)