Skip to content

Commit

Permalink
Async delete expired TS files
Browse files Browse the repository at this point in the history
  • Loading branch information
mapengfei53 committed Apr 22, 2022
1 parent 34ad906 commit 03c3a1f
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 35 deletions.
49 changes: 14 additions & 35 deletions trunk/src/app/srs_app_fragment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
//

#include <srs_app_fragment.hpp>
#include <srs_app_hybrid.hpp>

#include <srs_kernel_utility.hpp>
#include <srs_kernel_log.hpp>
#include <srs_kernel_error.hpp>

#include <unistd.h>
#include <sstream>

using namespace std;

SrsFragment::SrsFragment()
Expand Down Expand Up @@ -216,45 +218,22 @@ void SrsFragmentWindow::clear_expired(bool delete_files, srs_utime_t delay_time)
{
srs_error_t err = srs_success;

if (!delete_files || delay_time == 0) {
std::vector<SrsFragment*>::iterator it;

for (it = expired_fragments.begin(); it != expired_fragments.end(); ++it) {
SrsFragment* fragment = *it;
if (delete_files && (err = fragment->unlink_file()) != srs_success) {
srs_warn("Unlink ts failed, %s", srs_error_desc(err).c_str());
srs_freep(err);
}
srs_freep(fragment);
}

expired_fragments.clear();
} else {
srs_utime_t duration = 0;

int remove_index = -1;

for (int i = (int)expired_fragments.size() - 1; i >= 0; i--) {
SrsFragment* fragment = expired_fragments[i];
duration += fragment->duration();

if (duration > delay_time) {
remove_index = i;
break;
}
}
std::vector<SrsFragment*>::iterator it;

for (int i = 0; i < remove_index && !expired_fragments.empty(); i++) {
SrsFragment* fragment = *expired_fragments.begin();
expired_fragments.erase(expired_fragments.begin());
for (it = expired_fragments.begin(); it != expired_fragments.end(); ++it) {
SrsFragment* fragment = *it;

if ((err = fragment->unlink_file()) != srs_success) {
srs_warn("Unlink ts failed, %s", srs_error_desc(err).c_str());
srs_freep(err);
}
srs_freep(fragment);
if (delete_files && delay_time != 0) {
SrsTsFragment *ts_fragment = new SrsTsFragment(delay_time, fragment->fullpath());
_srs_hybrid->clock_cleanUp->append(ts_fragment);
} else if (delete_files && (err = fragment->unlink_file()) != srs_success) {
srs_warn("Unlink ts failed, %s", srs_error_desc(err).c_str());
srs_freep(err);
}
srs_freep(fragment);
}

expired_fragments.clear();
}

srs_utime_t SrsFragmentWindow::max_duration()
Expand Down
77 changes: 77 additions & 0 deletions trunk/src/app/srs_app_hybrid.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@
#include <srs_app_server.hpp>
#include <srs_app_config.hpp>
#include <srs_kernel_error.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_service_st.hpp>
#include <srs_app_utility.hpp>

#include <unistd.h>

using namespace std;

extern SrsPps* _srs_pps_cids_get;
Expand Down Expand Up @@ -129,12 +132,16 @@ SrsHybridServer::SrsHybridServer()
timer5s_ = new SrsFastTimer("hybrid", 5 * SRS_UTIME_SECONDS);

clock_monitor_ = new SrsClockWallMonitor();

clock_cleanUp = new SrsCleanUpExpiredTs();
}

SrsHybridServer::~SrsHybridServer()
{
srs_freep(clock_monitor_);

srs_freep(clock_cleanUp);

srs_freep(timer20ms_);
srs_freep(timer100ms_);
srs_freep(timer1s_);
Expand Down Expand Up @@ -176,6 +183,7 @@ srs_error_t SrsHybridServer::initialize()

// Register some timers.
timer20ms_->subscribe(clock_monitor_);
timer100ms_->subscribe(clock_cleanUp);
timer5s_->subscribe(this);

// Initialize all hybrid servers.
Expand Down Expand Up @@ -381,5 +389,74 @@ srs_error_t SrsHybridServer::on_timer(srs_utime_t interval)
return err;
}

SrsTsFragment::SrsTsFragment(srs_utime_t time, std::string path)
{
die_at = srs_get_system_time();
delay_time = time;
filepath = path;
}

SrsTsFragment::~SrsTsFragment()
{
}

bool SrsTsFragment::expired()
{
srs_utime_t now = srs_get_system_time();
if (now > die_at + delay_time) {
return true;
}

return false;
}

srs_error_t SrsTsFragment::unlink_file()
{
srs_error_t err = srs_success;

if (::unlink(filepath.c_str()) < 0) {
return srs_error_new(ERROR_SYSTEM_FRAGMENT_UNLINK, "unlink %s", filepath.c_str());
}

return err;
}

SrsCleanUpExpiredTs::SrsCleanUpExpiredTs()
{
}

SrsCleanUpExpiredTs::~SrsCleanUpExpiredTs()
{
}

void SrsCleanUpExpiredTs::append(SrsTsFragment* fragment)
{
fragments.push_back(fragment);
}

srs_error_t SrsCleanUpExpiredTs::on_timer(srs_utime_t interval)
{
srs_error_t err = srs_success;

std::list<SrsTsFragment*>::iterator it;
for (it = fragments.begin(); it != fragments.end();) {
SrsTsFragment* fragment = *it;

if (fragment->expired()) {
if ((err = fragment->unlink_file()) != srs_success) {
srs_warn("Unlink ts failed %s", srs_error_desc(err).c_str());
srs_freep(err);
}
srs_freep(fragment);

fragments.erase(it ++);
} else {
it ++;
}
}

return err;
}

SrsHybridServer* _srs_hybrid = NULL;

36 changes: 36 additions & 0 deletions trunk/src/app/srs_app_hybrid.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
#include <srs_core.hpp>

#include <vector>
#include <list>

#include <srs_app_hourglass.hpp>

class SrsServer;
class SrsServerAdapter;
class SrsWaitGroup;
class SrsCleanUpExpiredTs;

// The hibrid server interfaces, we could register many servers.
class ISrsHybridServer
Expand All @@ -42,6 +44,8 @@ class SrsHybridServer : public ISrsFastTimer
SrsFastTimer* timer1s_;
SrsFastTimer* timer5s_;
SrsClockWallMonitor* clock_monitor_;
public:
SrsCleanUpExpiredTs* clock_cleanUp;
public:
SrsHybridServer();
virtual ~SrsHybridServer();
Expand All @@ -62,6 +66,38 @@ class SrsHybridServer : public ISrsFastTimer
srs_error_t on_timer(srs_utime_t interval);
};

// The Ts fragment info.
class SrsTsFragment
{
private:
srs_utime_t die_at;
srs_utime_t delay_time;
std::string filepath;
public:
SrsTsFragment(srs_utime_t time, std::string path);
virtual ~SrsTsFragment();
public:
// Unlink ts file when expired.
virtual bool expired();
// Unlink the fragment, to delete the file.
virtual srs_error_t unlink_file();
};

// To clean up expired Ts file.
class SrsCleanUpExpiredTs : public ISrsFastTimer
{
private:
std::list<SrsTsFragment*> fragments;
public:
SrsCleanUpExpiredTs();
virtual ~SrsCleanUpExpiredTs();
public:
virtual void append(SrsTsFragment* fragment);
// interface ISrsFastTimer
private:
srs_error_t on_timer(srs_utime_t interval);
};

extern SrsHybridServer* _srs_hybrid;

#endif

0 comments on commit 03c3a1f

Please sign in to comment.