Skip to content

Commit

Permalink
Add tile pool to Frame
Browse files Browse the repository at this point in the history
  • Loading branch information
Pam Harris committed May 13, 2024
1 parent 640aee0 commit 418b79a
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 29 deletions.
51 changes: 26 additions & 25 deletions src/Frame/Frame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Frame::Frame(uint32_t session_id, std::shared_ptr<FileLoader> loader, const std:
_depth(1),
_num_stokes(1),
_image_cache_valid(false),
_use_tile_cache(false),
_moment_generator(nullptr),
_moment_name_index(0) {
// Initialize for operator==
Expand Down Expand Up @@ -83,15 +84,17 @@ Frame::Frame(uint32_t session_id, std::shared_ptr<FileLoader> loader, const std:
_depth = (_z_axis >= 0 ? _image_shape(_z_axis) : 1);
_num_stokes = (_stokes_axis >= 0 ? _image_shape(_stokes_axis) : 1);

_use_tile_cache = _loader->UseTileCache();

// load full image cache for loaders that don't use the tile cache and mipmaps
if (load_image_cache && !(_loader->UseTileCache() && _loader->HasMip(2)) && !FillImageCache()) {
if (load_image_cache && !(_use_tile_cache && _loader->HasMip(2)) && !FillImageCache()) {
_open_image_error = fmt::format("Cannot load image data. Check log.");
_valid = false;
return;
}

// reset the tile cache if the loader will use it
if (_loader->UseTileCache()) {
if (_use_tile_cache) {
int tiles_x = (_width - 1) / TILE_SIZE + 1;
int tiles_y = (_height - 1) / TILE_SIZE + 1;
int tile_cache_capacity = std::min(MAX_TILE_CACHE_CAPACITY, 2 * (tiles_x + tiles_y));
Expand Down Expand Up @@ -342,13 +345,13 @@ bool Frame::SetImageChannels(int new_z, int new_stokes, std::string& message) {
_z_index = new_z;
_stokes_index = new_stokes;

if (!(_loader->UseTileCache() && _loader->HasMip(2)) || IsComputedStokes(_stokes_index)) {
if (!(_use_tile_cache && _loader->HasMip(2)) || IsComputedStokes(_stokes_index)) {
// Reload the full channel cache for loaders which use it
FillImageCache();
} else {
// Don't reload the full channel cache here because we may not need it

if (_loader->UseTileCache()) {
if (_use_tile_cache) {
// invalidate / clear the full resolution tile cache
_tile_cache.Reset(_z_index, _stokes_index);
}
Expand Down Expand Up @@ -466,7 +469,7 @@ bool Frame::GetRasterData(std::vector<float>& image_data, CARTA::ImageBounds& bo

// Tile data
bool Frame::FillRasterTileData(CARTA::RasterTileData& raster_tile_data, const Tile& tile, int z, int stokes,
CARTA::CompressionType compression_type, float compression_quality) {
CARTA::CompressionType compression_type, float compression_quality, int num_threads) {
// Early exit if z or stokes has changed
if (ZStokesChanged(z, stokes)) {
return false;
Expand All @@ -480,6 +483,11 @@ bool Frame::FillRasterTileData(CARTA::RasterTileData& raster_tile_data, const Ti
raster_tile_data.clear_tiles();
}

if (!_tile_pool) {
_tile_pool = std::make_shared<TilePool>();
_tile_pool->Grow(num_threads);
}

CARTA::TileData* tile_ptr = raster_tile_data.add_tiles();
tile_ptr->set_layer(tile.layer);
tile_ptr->set_x(tile.x);
Expand All @@ -500,10 +508,7 @@ bool Frame::FillRasterTileData(CARTA::RasterTileData& raster_tile_data, const Ti
tile_ptr->set_image_data(tile_data_ptr->data(), sizeof(float) * tile_data_ptr->size());
return true;
} else if (compression_type == CARTA::CompressionType::ZFP) {
// GetNanEncodingsBlock changes tile data (cached in TileCache)!
std::vector<float> tile_data_to_compress;
tile_data_to_compress.assign(tile_data_ptr->begin(), tile_data_ptr->end());
auto nan_encodings = GetNanEncodingsBlock(tile_data_to_compress, 0, tile_width, tile_height);
auto nan_encodings = GetNanEncodingsBlock(*tile_data_ptr, 0, tile_width, tile_height);
tile_ptr->set_nan_encodings(nan_encodings.data(), sizeof(int32_t) * nan_encodings.size());

if (ZStokesChanged(z, stokes)) {
Expand All @@ -516,16 +521,15 @@ bool Frame::FillRasterTileData(CARTA::RasterTileData& raster_tile_data, const Ti
std::vector<char> compression_buffer;
size_t compressed_size;
int precision = lround(compression_quality);
Compress(tile_data_to_compress, 0, compression_buffer, compressed_size, tile_width, tile_height, precision);
Compress(*tile_data_ptr, 0, compression_buffer, compressed_size, tile_width, tile_height, precision);
float compression_ratio = (float)tile_image_data_size / (float)compressed_size;
bool use_high_precision(false);

if (precision < HIGH_COMPRESSION_QUALITY && compression_ratio > 20) {
// re-compress the data with a higher precision
std::vector<char> compression_buffer_hq;
size_t compressed_size_hq;
Compress(
tile_data_to_compress, 0, compression_buffer_hq, compressed_size_hq, tile_width, tile_height, HIGH_COMPRESSION_QUALITY);
Compress(*tile_data_ptr, 0, compression_buffer_hq, compressed_size_hq, tile_width, tile_height, HIGH_COMPRESSION_QUALITY);
float compression_ratio_hq = (float)tile_image_data_size / (float)compressed_size_hq;

if (compression_ratio_hq > 10) {
Expand Down Expand Up @@ -576,27 +580,24 @@ bool Frame::GetRasterTileData(std::shared_ptr<std::vector<float>>& tile_data_ptr
width = std::ceil((float)req_width / mip);
height = std::ceil((float)req_height / mip);

std::vector<float> tile_data;
tile_data_ptr = _tile_pool->Pull();
bool loaded_data(0);

if (mip > 1 && !IsComputedStokes(_stokes_index)) {
// Try to load downsampled data from the image file
loaded_data = _loader->GetDownsampledRasterData(tile_data, _z_index, _stokes_index, bounds, mip, _image_mutex);
} else if (!_image_cache_valid && _loader->UseTileCache()) {
// Load a tile from the tile cache only if this is supported *and* the full image cache isn't populated
tile_data_ptr = _tile_cache.Get(TileCache::Key(bounds.x_min(), bounds.y_min()), _loader, _image_mutex);
if (tile_data_ptr) {
loaded_data = _loader->GetDownsampledRasterData(*tile_data_ptr, _z_index, _stokes_index, bounds, mip, _image_mutex);
} else if (!_image_cache_valid && _use_tile_cache) {
// Load a tile from the tile cache if the full image cache isn't populated
auto cache_tile_ptr = _tile_cache.Get(TileCache::Key(bounds.x_min(), bounds.y_min()), _loader, _image_mutex);
if (cache_tile_ptr) {
tile_data_ptr->assign(cache_tile_ptr->begin(), cache_tile_ptr->end());
return true;
}
}

// Fall back to using the full image cache.
if (!loaded_data) {
loaded_data = GetRasterData(tile_data, bounds, mip, true);
}

if (loaded_data) {
tile_data_ptr = std::make_shared<std::vector<float>>(tile_data);
loaded_data = GetRasterData(*tile_data_ptr, bounds, mip, true);
}

return loaded_data;
Expand Down Expand Up @@ -1084,7 +1085,7 @@ bool Frame::FillSpatialProfileData(PointXy point, std::vector<CARTA::SetSpatialR
queuing_rw_mutex_scoped cache_lock(&_cache_mutex, write_lock);
cursor_value_with_current_stokes = _image_cache[(y * _width) + x];
cache_lock.release();
} else if (_loader->UseTileCache()) {
} else if (_use_tile_cache) {
int tile_x = tile_index(x);
int tile_y = tile_index(y);
auto tile = _tile_cache.Get(TileCache::Key(tile_x, tile_y), _loader, _image_mutex);
Expand Down Expand Up @@ -1195,7 +1196,7 @@ bool Frame::FillSpatialProfileData(PointXy point, std::vector<CARTA::SetSpatialR
}

if (is_current_stokes) {
if (_loader->UseTileCache()) { // Use tile cache to return full resolution data or prepare data for decimation
if (_use_tile_cache) { // Use tile cache to return full resolution data or prepare data for decimation
profile.resize(end - start);

if (config.coordinate().back() == 'x') {
Expand Down
8 changes: 6 additions & 2 deletions src/Frame/Frame.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class Frame {

// Raster data
bool FillRasterTileData(CARTA::RasterTileData& raster_tile_data, const Tile& tile, int z, int stokes,
CARTA::CompressionType compression_type, float compression_quality);
CARTA::CompressionType compression_type, float compression_quality, int num_threads);

// Functions used for smoothing and contouring
bool SetContourParameters(const CARTA::SetContourParameters& message);
Expand Down Expand Up @@ -303,10 +303,14 @@ class Frame {
queuing_rw_mutex _cache_mutex; // allow concurrent reads but lock for write
std::mutex _image_mutex; // only one disk access at a time
bool _cache_loaded; // channel cache is set
TileCache _tile_cache; // cache for full-resolution image tiles
std::mutex _ignore_interrupt_X_mutex;
std::mutex _ignore_interrupt_Y_mutex;

// Tile data
bool _use_tile_cache;
TileCache _tile_cache; // cache for full-resolution image tiles
std::shared_ptr<TilePool> _tile_pool; // memory allocated for tile data

// Use a shared lock for long time calculations, use an exclusive lock for the object destruction
mutable std::shared_mutex _active_task_mutex;

Expand Down
4 changes: 2 additions & 2 deletions src/Session/Session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -705,8 +705,8 @@ void Session::OnAddRequiredTiles(const CARTA::AddRequiredTiles& message, int ani
const auto& encoded_coordinate = message.tiles(i);
auto raster_tile_data = Message::RasterTileData(file_id, sync_id, animation_id);
auto tile = Tile::Decode(encoded_coordinate);
if (_frames.count(file_id) &&
_frames.at(file_id)->FillRasterTileData(raster_tile_data, tile, z, stokes, compression_type, compression_quality)) {
if (_frames.count(file_id) && _frames.at(file_id)->FillRasterTileData(
raster_tile_data, tile, z, stokes, compression_type, compression_quality, num_threads)) {
// Only use deflate on outgoing message if the raster image compression type is NONE
SendFileEvent(
file_id, CARTA::EventType::RASTER_TILE_DATA, 0, raster_tile_data, compression_type == CARTA::CompressionType::NONE);
Expand Down

0 comments on commit 418b79a

Please sign in to comment.