Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HLS: Added delay cleanup old expired ts files #3001

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions trunk/conf/full.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1379,6 +1379,11 @@ vhost hls.srs.com {
# whether cleanup the old expired ts files.
# default: on
hls_cleanup on;
# whether delay cleanup the old expired ts files.
# when hls_cleanup is on, delay cleanup the old expired ts files in this timeout in seconds.
# @remark 0 to disable delay cleanup for publisher.
# default: 0
hls_delay_cleanup 0;
# If there is no incoming packets, dispose HLS in this timeout in seconds,
# which removes all HLS files including m3u8 and ts files.
# @remark 0 to disable dispose for publisher.
Expand Down
19 changes: 18 additions & 1 deletion trunk/src/app/srs_app_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2808,7 +2808,7 @@ srs_error_t SrsConfig::check_normal_config()
&& m != "hls_storage" && m != "hls_mount" && m != "hls_td_ratio" && m != "hls_aof_ratio" && m != "hls_acodec" && m != "hls_vcodec"
&& m != "hls_m3u8_file" && m != "hls_ts_file" && m != "hls_ts_floor" && m != "hls_cleanup" && m != "hls_nb_notify"
&& m != "hls_wait_keyframe" && m != "hls_dispose" && m != "hls_keys" && m != "hls_fragments_per_key" && m != "hls_key_file"
&& m != "hls_key_file_path" && m != "hls_key_url" && m != "hls_dts_directly") {
&& m != "hls_key_file_path" && m != "hls_key_url" && m != "hls_dts_directly" && m != "hls_delay_cleanup") {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.hls.%s of %s", m.c_str(), vhost->arg0().c_str());
}

Expand Down Expand Up @@ -6301,6 +6301,23 @@ string SrsConfig::get_hls_key_url(std::string vhost)
return conf->arg0();
}

srs_utime_t SrsConfig::get_hls_delay_cleanup(string vhost)
{
static srs_utime_t DEFAULT = 0;

SrsConfDirective* conf = get_hls(vhost);
if (!conf) {
return DEFAULT;
}

conf = conf->get("hls_delay_cleanup");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}

return srs_utime_t(::atof(conf->arg0().c_str()) * SRS_UTIME_SECONDS);
}

SrsConfDirective *SrsConfig::get_hds(const string &vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
Expand Down
2 changes: 2 additions & 0 deletions trunk/src/app/srs_app_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,8 @@ class SrsConfig
virtual int get_vhost_hls_nb_notify(std::string vhost);
// Whether turn the FLV timestamp to TS DTS.
virtual bool get_vhost_hls_dts_directly(std::string vhost);
// Get the delay time to clean up old ts files.
virtual srs_utime_t get_hls_delay_cleanup(std::string vhost);
// hds section
private:
// Get the hds directive of vhost.
Expand Down
16 changes: 11 additions & 5 deletions trunk/src/app/srs_app_fragment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
//

#include <srs_app_fragment.hpp>
#include <srs_app_hybrid.hpp>

#include <srs_kernel_utility.hpp>
#include <srs_kernel_log.hpp>
#include <srs_kernel_error.hpp>

#include <unistd.h>
#include <sstream>

using namespace std;

SrsFragment::SrsFragment()
Expand Down Expand Up @@ -212,21 +214,25 @@ void SrsFragmentWindow::shrink(srs_utime_t window)
}
}

void SrsFragmentWindow::clear_expired(bool delete_files)
void SrsFragmentWindow::clear_expired(bool delete_files, srs_utime_t delay_time)
{
srs_error_t err = srs_success;

std::vector<SrsFragment*>::iterator it;

for (it = expired_fragments.begin(); it != expired_fragments.end(); ++it) {
SrsFragment* fragment = *it;
if (delete_files && (err = fragment->unlink_file()) != srs_success) {

if (delete_files && delay_time != 0) {
SrsTsFragment *ts_fragment = new SrsTsFragment(delay_time, fragment->fullpath());
_srs_hybrid->clock_cleanUp->append(ts_fragment);
} else if (delete_files && (err = fragment->unlink_file()) != srs_success) {
srs_warn("Unlink ts failed, %s", srs_error_desc(err).c_str());
srs_freep(err);
}
srs_freep(fragment);
}

expired_fragments.clear();
}

Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_fragment.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class SrsFragmentWindow
// Shrink the window, push the expired fragment to a queue.
virtual void shrink(srs_utime_t window);
// Clear the expired fragments.
virtual void clear_expired(bool delete_files);
virtual void clear_expired(bool delete_files, srs_utime_t delay_time);
// Get the max duration in srs_utime_t of all fragments.
virtual srs_utime_t max_duration();
public:
Expand Down
17 changes: 12 additions & 5 deletions trunk/src/app/srs_app_hls.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ SrsHlsMuxer::SrsHlsMuxer()
current = NULL;
hls_keys = false;
hls_fragments_per_key = 0;
hls_delay_cleanup = 0;
async = new SrsAsyncCallWorker();
context = new SrsTsContext();
segments = new SrsFragmentWindow();
Expand Down Expand Up @@ -288,7 +289,8 @@ srs_error_t SrsHlsMuxer::on_unpublish()
srs_error_t SrsHlsMuxer::update_config(SrsRequest* r, string entry_prefix,
string path, string m3u8_file, string ts_file, srs_utime_t fragment, srs_utime_t window,
bool ts_floor, double aof_ratio, bool cleanup, bool wait_keyframe, bool keys,
int fragments_per_key, string key_file ,string key_file_path, string key_url)
int fragments_per_key, string key_file ,string key_file_path, string key_url,
srs_utime_t delay_cleanup)
{
srs_error_t err = srs_success;

Expand All @@ -313,6 +315,8 @@ srs_error_t SrsHlsMuxer::update_config(SrsRequest* r, string entry_prefix,
hls_key_file = key_file;
hls_key_file_path = key_file_path;
hls_key_url = key_url;

hls_delay_cleanup = delay_cleanup;

// generate the m3u8 dir and path.
m3u8_url = srs_path_build_stream(m3u8_file, req->vhost, req->app, req->stream);
Expand Down Expand Up @@ -659,7 +663,7 @@ srs_error_t SrsHlsMuxer::do_segment_close()
err = refresh_m3u8();

// remove the ts file.
segments->clear_expired(hls_cleanup);
segments->clear_expired(hls_cleanup, hls_delay_cleanup);

// check ret of refresh m3u8
if (err != srs_success) {
Expand Down Expand Up @@ -902,6 +906,9 @@ srs_error_t SrsHlsController::on_publish(SrsRequest* req)
string hls_key_file = _srs_config->get_hls_key_file(vhost);
string hls_key_file_path = _srs_config->get_hls_key_file_path(vhost);
string hls_key_url = _srs_config->get_hls_key_url(vhost);

// the time to delay cleanup ts files.
srs_utime_t hls_delay_cleanup = _srs_config->get_hls_delay_cleanup(vhost);

// TODO: FIXME: support load exists m3u8, to continue publish stream.
// for the HLS donot requires the EXT-X-MEDIA-SEQUENCE be monotonically increase.
Expand All @@ -912,7 +919,7 @@ srs_error_t SrsHlsController::on_publish(SrsRequest* req)

if ((err = muxer->update_config(req, entry_prefix, path, m3u8_file, ts_file, hls_fragment,
hls_window, ts_floor, hls_aof_ratio, cleanup, wait_keyframe,hls_keys,hls_fragments_per_key,
hls_key_file, hls_key_file_path, hls_key_url)) != srs_success ) {
hls_key_file, hls_key_file_path, hls_key_url, hls_delay_cleanup)) != srs_success ) {
return srs_error_wrap(err, "hls: update config");
}

Expand All @@ -923,9 +930,9 @@ srs_error_t SrsHlsController::on_publish(SrsRequest* req)
// This config item is used in SrsHls, we just log its value here.
bool hls_dts_directly = _srs_config->get_vhost_hls_dts_directly(req->vhost);

srs_trace("hls: win=%dms, frag=%dms, prefix=%s, path=%s, m3u8=%s, ts=%s, aof=%.2f, floor=%d, clean=%d, waitk=%d, dispose=%dms, dts_directly=%d",
srs_trace("hls: win=%dms, frag=%dms, prefix=%s, path=%s, m3u8=%s, ts=%s, aof=%.2f, floor=%d, clean=%d, waitk=%d, dispose=%dms, dts_directly=%d, delay_cleanup=%dms",
srsu2msi(hls_window), srsu2msi(hls_fragment), entry_prefix.c_str(), path.c_str(), m3u8_file.c_str(), ts_file.c_str(),
hls_aof_ratio, ts_floor, cleanup, wait_keyframe, srsu2msi(hls_dispose), hls_dts_directly);
hls_aof_ratio, ts_floor, cleanup, wait_keyframe, srsu2msi(hls_dispose), hls_dts_directly, srsu2msi(hls_delay_cleanup));

return err;
}
Expand Down
4 changes: 3 additions & 1 deletion trunk/src/app/srs_app_hls.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ class SrsHlsMuxer
// TODO: FIXME: Use TBN 1000.
srs_utime_t hls_fragment;
srs_utime_t hls_window;
srs_utime_t hls_delay_cleanup;
SrsAsyncCallWorker* async;
private:
// Whether use floor algorithm for timestamp.
Expand Down Expand Up @@ -177,7 +178,8 @@ class SrsHlsMuxer
std::string path, std::string m3u8_file, std::string ts_file,
srs_utime_t fragment, srs_utime_t window, bool ts_floor, double aof_ratio,
bool cleanup, bool wait_keyframe, bool keys, int fragments_per_key,
std::string key_file, std::string key_file_path, std::string key_url);
std::string key_file, std::string key_file_path, std::string key_url,
srs_utime_t delay_cleanup);
// Open a new segment(a new ts file)
virtual srs_error_t segment_open();
virtual srs_error_t on_sequence_header();
Expand Down
76 changes: 76 additions & 0 deletions trunk/src/app/srs_app_hybrid.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@
#include <srs_app_server.hpp>
#include <srs_app_config.hpp>
#include <srs_kernel_error.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_service_st.hpp>
#include <srs_app_utility.hpp>

#include <unistd.h>

using namespace std;

extern SrsPps* _srs_pps_cids_get;
Expand Down Expand Up @@ -129,12 +132,16 @@ SrsHybridServer::SrsHybridServer()
timer5s_ = new SrsFastTimer("hybrid", 5 * SRS_UTIME_SECONDS);

clock_monitor_ = new SrsClockWallMonitor();

clock_cleanUp = new SrsCleanUpExpiredTs();
}

SrsHybridServer::~SrsHybridServer()
{
srs_freep(clock_monitor_);

srs_freep(clock_cleanUp);

srs_freep(timer20ms_);
srs_freep(timer100ms_);
srs_freep(timer1s_);
Expand Down Expand Up @@ -176,6 +183,7 @@ srs_error_t SrsHybridServer::initialize()

// Register some timers.
timer20ms_->subscribe(clock_monitor_);
timer100ms_->subscribe(clock_cleanUp);
timer5s_->subscribe(this);

// Initialize all hybrid servers.
Expand Down Expand Up @@ -381,5 +389,73 @@ srs_error_t SrsHybridServer::on_timer(srs_utime_t interval)
return err;
}

SrsTsFragment::SrsTsFragment(srs_utime_t time, std::string path)
{
die_at = srs_get_system_time() + time;
filepath = path;
}

SrsTsFragment::~SrsTsFragment()
{
}

bool SrsTsFragment::expired()
{
srs_utime_t now = srs_get_system_time();
if (now > die_at) {
return true;
}

return false;
}

srs_error_t SrsTsFragment::unlink_file()
{
srs_error_t err = srs_success;

if (::unlink(filepath.c_str()) < 0) {
return srs_error_new(ERROR_SYSTEM_FRAGMENT_UNLINK, "unlink %s", filepath.c_str());
}

return err;
}

SrsCleanUpExpiredTs::SrsCleanUpExpiredTs()
{
}

SrsCleanUpExpiredTs::~SrsCleanUpExpiredTs()
{
}

void SrsCleanUpExpiredTs::append(SrsTsFragment* fragment)
{
fragments.push_back(fragment);
}

srs_error_t SrsCleanUpExpiredTs::on_timer(srs_utime_t interval)
{
srs_error_t err = srs_success;

std::list<SrsTsFragment*>::iterator it;
for (it = fragments.begin(); it != fragments.end();) {
SrsTsFragment* fragment = *it;

if (fragment->expired()) {
if ((err = fragment->unlink_file()) != srs_success) {
srs_warn("Unlink ts failed %s", srs_error_desc(err).c_str());
srs_freep(err);
}
srs_freep(fragment);

fragments.erase(it ++);
} else {
it ++;
}
}

return err;
}

SrsHybridServer* _srs_hybrid = NULL;

35 changes: 35 additions & 0 deletions trunk/src/app/srs_app_hybrid.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
#include <srs_core.hpp>

#include <vector>
#include <list>

#include <srs_app_hourglass.hpp>

class SrsServer;
class SrsServerAdapter;
class SrsWaitGroup;
class SrsCleanUpExpiredTs;

// The hibrid server interfaces, we could register many servers.
class ISrsHybridServer
Expand All @@ -42,6 +44,8 @@ class SrsHybridServer : public ISrsFastTimer
SrsFastTimer* timer1s_;
SrsFastTimer* timer5s_;
SrsClockWallMonitor* clock_monitor_;
public:
SrsCleanUpExpiredTs* clock_cleanUp;
public:
SrsHybridServer();
virtual ~SrsHybridServer();
Expand All @@ -62,6 +66,37 @@ class SrsHybridServer : public ISrsFastTimer
srs_error_t on_timer(srs_utime_t interval);
};

// The Ts fragment info.
class SrsTsFragment
{
private:
srs_utime_t die_at;
std::string filepath;
public:
SrsTsFragment(srs_utime_t time, std::string path);
virtual ~SrsTsFragment();
public:
// Unlink ts file when expired.
virtual bool expired();
// Unlink the fragment, to delete the file.
virtual srs_error_t unlink_file();
};

// To clean up expired Ts file.
class SrsCleanUpExpiredTs : public ISrsFastTimer
{
private:
std::list<SrsTsFragment*> fragments;
public:
SrsCleanUpExpiredTs();
virtual ~SrsCleanUpExpiredTs();
public:
virtual void append(SrsTsFragment* fragment);
// interface ISrsFastTimer
private:
srs_error_t on_timer(srs_utime_t interval);
};

extern SrsHybridServer* _srs_hybrid;

#endif