-
Notifications
You must be signed in to change notification settings - Fork 15
/
multitrack-conference-server.py
126 lines (102 loc) · 4.21 KB
/
multitrack-conference-server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver.common.alert import Alert
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from flask import Flask, request
from selenium.webdriver.chrome.service import Service
class Browser:
def init(self, is_headless):
browser_options = Options()
browser_options.add_experimental_option("detach", True)
browser_options.add_argument("--use-fake-ui-for-media-stream")
browser_options.add_argument("--use-fake-device-for-media-stream")
browser_options.add_argument('--log-level=3')
browser_options.add_argument('--no-sandbox')
browser_options.add_argument('--disable-extensions')
browser_options.add_argument('--disable-gpu')
browser_options.add_argument('--disable-dev-shm-usage')
browser_options.add_argument('--disable-setuid-sandbox')
if is_headless:
browser_options.add_argument("--headless")
dc = DesiredCapabilities.CHROME.copy()
dc['goog:loggingPrefs'] = { 'browser':'ALL' }
service = Service(executable_path='/home/ubuntu/chromedriver')
#service = Service(executable_path='C:/WebDriver/chromedriver.exe')
self.driver = webdriver.Chrome(service=service, options=browser_options)
def open_in_new_tab(self, url, tab_id):
self.driver.switch_to.window(self.driver.window_handles[0])
self.driver.execute_script("window.open('about:blank', '"+tab_id+"');")
print (self.driver.window_handles)
self.driver.switch_to.window(tab_id)
print(url)
self.driver.get(url)
def get_element_by_id(self, id):
timeout = 5
try:
element_present = EC.element_to_be_clickable((By.ID, id))
WebDriverWait(self.driver, timeout).until(element_present)
except TimeoutException:
print ("Timed out waiting for page to load")
element = self.driver.find_element(By.ID, id)
return element
def write_to_element(self, element, text):
element.send_keys(text)
def click_element(self, element):
element.click()
def switch_to_tab(self, tab_id):
self.driver.switch_to.window(tab_id)
def close(self):
self.driver.close()
def close_all(self):
for handle in self.driver.window_handles:
self.driver.switch_to.window(handle)
self.driver.close()
url="https://test.antmedia.io:5443/LiveApp/conference.html"
#url="www.google.com"
app = Flask(__name__)
chrome = Browser()
chrome.init(True)
@app.route('/create', methods=['GET'])
def create():
room = request.args.get('room')
test = request.args.get('test')
participant = request.args.get('participant')
print("\n create for room:"+room+":"+participant+" in "+test)
chrome.open_in_new_tab(url+"?roomId="+room+"&streamId="+participant, participant)
return f'Room created', 200
@app.route('/join', methods=['GET'])
def join():
room = request.args.get('room')
test = request.args.get('test')
participant = request.args.get('participant')
print("\n join for room:"+room+":"+participant+" in "+test)
chrome.switch_to_tab(participant)
join_button = chrome.get_element_by_id("join_publish_button")
join_button.click()
return f'Joined the room', 200
@app.route('/leave', methods=['GET'])
def leave():
room = request.args.get('room')
test = request.args.get('test')
participant = request.args.get('participant')
print("\n leave for room:"+room+":"+participant+" in "+test)
chrome.switch_to_tab(participant)
leave_button = chrome.get_element_by_id("stop_publish_button")
leave_button.click()
return f'Left the room', 200
@app.route('/delete', methods=['GET'])
def delete():
room = request.args.get('room')
test = request.args.get('test')
participant = request.args.get('participant')
print("\n delete for room:"+room+":"+participant+" in "+test)
chrome.switch_to_tab(participant)
chrome.close()
return f'Tab closed', 200
if __name__ == '__main__':
app.run(host='0.0.0.0', port=3030)