199 lines
5.9 KiB
Python
199 lines
5.9 KiB
Python
import sys
|
|
import os
|
|
import re
|
|
import tempfile
|
|
import shutil
|
|
import subprocess
|
|
import trio
|
|
from trio_cdp import open_cdp, target
|
|
|
|
if sys.platform in ('win32', 'cygwin'):
|
|
CHROMIUM_BINARIES = [
|
|
r'{}\Google\Chrome\Application\chrome.exe'.format(os.environ['LocalAppData']),
|
|
]
|
|
elif sys.platform == 'darwin':
|
|
CHROMIUM_BINARIES = [
|
|
'/Applications/Chromium.app/Contents/MacOS/Chromium',
|
|
'/Applications/Google Chrome.app/Contents/MacOS/Google Chrome',
|
|
]
|
|
else:
|
|
CHROMIUM_BINARIES = ['chromium-browser', 'chromium', 'google-chrome', 'chrome']
|
|
|
|
CHROMIUM_ARGS = {
|
|
'hide-scrollbars',
|
|
'no-first-run',
|
|
'use-mock-keychain',
|
|
'password-store=basic',
|
|
'incognito',
|
|
'force-device-scale-factor=1',
|
|
}
|
|
CHROMIUM_ENABLES = {
|
|
'automation',
|
|
'features=NetworkService,NetworkServiceInProcess',
|
|
}
|
|
CHROMIUM_DISABLES = {
|
|
'background-networking',
|
|
'background-timer-throttling',
|
|
'backgrounding-occluded-windows',
|
|
'breakpad',
|
|
'client-side-phishing-detection',
|
|
'crash-reporter',
|
|
'component-extensions-with-background-pages',
|
|
'default-apps',
|
|
'dev-shm-usage',
|
|
'extensions',
|
|
'gaia-services',
|
|
'hang-monitor',
|
|
'ipc-flooding-protection',
|
|
'login-animations',
|
|
'login-screen-apps',
|
|
'notifications',
|
|
'popup-blocking',
|
|
'prompt-on-repost',
|
|
'renderer-backgrounding',
|
|
'search-geolocation-disclosure',
|
|
'sync',
|
|
'features=TranslateUI',
|
|
}
|
|
CHROMIUM_HEADLESS_ARGS = {
|
|
'headless'
|
|
}
|
|
|
|
|
|
# From: https://stackoverflow.com/questions/53575979/how-can-i-read-one-line-at-a-time-from-a-trio-receivestream
|
|
class LineReader:
|
|
def __init__(self, stream, max_line_length=16384):
|
|
self.stream = stream
|
|
self._line_generator = self.generate_lines(max_line_length)
|
|
|
|
@staticmethod
|
|
def generate_lines(max_line_length):
|
|
buf = bytearray()
|
|
find_start = 0
|
|
while True:
|
|
newline_idx = buf.find(b'\n', find_start)
|
|
if newline_idx < 0:
|
|
# no b'\n' found in buf
|
|
if len(buf) > max_line_length:
|
|
raise ValueError("line too long")
|
|
# next time, start the search where this one left off
|
|
find_start = len(buf)
|
|
more_data = yield
|
|
else:
|
|
# b'\n' found in buf so return the line and move up buf
|
|
line = buf[:newline_idx+1]
|
|
# Update the buffer in place, to take advantage of bytearray's
|
|
# optimized delete-from-beginning feature.
|
|
del buf[:newline_idx+1]
|
|
# next time, start the search from the beginning
|
|
find_start = 0
|
|
more_data = yield line
|
|
|
|
if more_data is not None:
|
|
buf += bytes(more_data)
|
|
|
|
async def readline(self):
|
|
line = next(self._line_generator)
|
|
while line is None:
|
|
more_data = await self.stream.receive_some()
|
|
if not more_data:
|
|
return None
|
|
line = self._line_generator.send(more_data)
|
|
return line
|
|
|
|
async def readlines(self):
|
|
while True:
|
|
line = await self.readline()
|
|
if line is None:
|
|
break
|
|
yield line
|
|
|
|
|
|
class Chromium:
|
|
DEVTOOLS_PATTERN = re.compile('^DevTools listening on (\S+)$')
|
|
|
|
def __init__(self, binary=None, headless=True, args=[]):
|
|
self.binary = binary
|
|
self.headless = headless
|
|
self.args = args
|
|
self.profile_dir = None
|
|
self.process = None
|
|
|
|
def __del__(self):
|
|
self.close()
|
|
|
|
async def launch(self):
|
|
if self.process:
|
|
await self.close()
|
|
self.profile_dir = tempfile.TemporaryDirectory()
|
|
|
|
base_args = (
|
|
['--disable-{}'.format(x) for x in CHROMIUM_DISABLES] +
|
|
['--enable-{}'.format(x) for x in CHROMIUM_ENABLES] +
|
|
['--{}'.format(x) for x in CHROMIUM_ARGS] +
|
|
[
|
|
'--user-data-dir={}'.format(self.profile_dir.name),
|
|
'--remote-debugging-port=0'
|
|
]
|
|
)
|
|
if self.headless:
|
|
base_args += ['--{}'.format(x) for x in CHROMIUM_HEADLESS_ARGS]
|
|
if all(x.startswith('-') for x in self.args):
|
|
self.args.append('about:blank')
|
|
|
|
if not self.binary:
|
|
for path in CHROMIUM_BINARIES:
|
|
if '/' in path or '\\' in path:
|
|
if os.path.isfile(path):
|
|
binary = path
|
|
break
|
|
else:
|
|
if shutil.which(path):
|
|
binary = path
|
|
break
|
|
else:
|
|
raise ValueError('could not find Chromium executable!')
|
|
else:
|
|
binary = self.binary
|
|
|
|
self.process = await trio.open_process([binary] + base_args + self.args, stderr=subprocess.PIPE)
|
|
return self.process
|
|
|
|
async def close(self):
|
|
if not self.process:
|
|
return
|
|
self.process.terminate()
|
|
self.process = None
|
|
self.profile_dir.__exit__()
|
|
self.profile_dir = None
|
|
|
|
async def connect(self):
|
|
process = await self.launch()
|
|
reader = LineReader(process.stderr)
|
|
async for line in reader.readlines():
|
|
match = self.DEVTOOLS_PATTERN.match(line.decode('utf-8'))
|
|
if match:
|
|
cdp_url = match.group(1)
|
|
break
|
|
else:
|
|
raise ValueError('could not connect to Chromium')
|
|
|
|
return open_cdp(cdp_url)
|
|
|
|
async def select(self, conn):
|
|
targets = await target.get_targets()
|
|
for t in targets:
|
|
if t.attached:
|
|
continue
|
|
if t.type_ != 'page':
|
|
continue
|
|
if t.url.startswith('devtools://'):
|
|
continue
|
|
target_id = t.target_id
|
|
break
|
|
else:
|
|
raise ValueError('could not find target')
|
|
|
|
# Create a new session with the chosen target.
|
|
return conn.open_session(target_id)
|