# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import os
from urllib.parse import quote
from marionette_driver import Wait, errors
from marionette_harness import MarionetteTestCase, WindowManagerMixin
def inline(doc):
return f"data:text/html;charset=utf-8,{quote(doc)}"
class TestSessionRestoreLogging(WindowManagerMixin, MarionetteTestCase):
def setUp(self):
super(TestSessionRestoreLogging, self).setUp()
self.marionette.enforce_gecko_prefs(
{
"browser.sessionstore.loglevel": "Debug",
"browser.sessionstore.log.appender.file.logOnSuccess": True,
}
)
def tearDown(self):
try:
# Create a fresh profile for subsequent tests.
self.marionette.restart(in_app=False, clean=True)
finally:
super(TestSessionRestoreLogging, self).tearDown()
def getSessionFilePath(self):
profilePath = self.marionette.instance.profile.profile
assert profilePath is not None
return os.path.join(profilePath, "sessionstore.jsonlz4")
def getLogDirectoryPath(self):
profilePath = self.marionette.instance.profile.profile
assert profilePath is not None
return os.path.join(profilePath, "sessionstore-logs")
def cleanupLogDirectory(self):
dirPath = self.getLogDirectoryPath()
for fname in self.getLogFiles():
fpath = os.path.join(dirPath, fname)
os.remove(fpath)
def getLogFiles(self, matchstr=""):
dirPath = self.getLogDirectoryPath()
fileNames = []
if os.path.isdir(dirPath):
fileNames = [entry.name for entry in os.scandir(dirPath) if entry.is_file()]
# Optionally filter by substring
if len(matchstr) > 0 and len(fileNames) > 0:
fileNames = filter(lambda name: matchstr in name, fileNames)
return fileNames
def getMostRecentLogFile(self, matchstr=""):
dirPath = self.getLogDirectoryPath()
# Build full paths
files = [os.path.join(dirPath, f) for f in self.getLogFiles(matchstr)]
# Get the newest file by creation time
return max(files, key=os.path.getctime) if len(files) > 0 else None
def openAndCloseSaveworthyWindow(self):
urls = (
inline("""
ipsum
"""),
inline("""dolor
"""),
)
self.marionette.set_context("chrome")
origWindow = self.marionette.current_window_handle
win = self.open_window(private=False)
self.marionette.switch_to_window(win)
with self.marionette.using_context("content"):
for index, url in enumerate(urls):
if index > 0:
tab = self.open_tab()
self.marionette.switch_to_window(tab)
self.marionette.navigate(url)
self.marionette.execute_script("window.close()")
self.marionette.switch_to_window(origWindow)
def getLineCount(self, logFile):
with open(logFile) as f:
return sum(1 for _ in f)
def test_per_startup_logfile_creation(self):
self.marionette.quit()
self.cleanupLogDirectory()
self.marionette.start_session()
self.marionette.set_context("chrome")
self.marionette.quit()
# There were no errors so we just expect the success log file to have been created
self.assertEqual(
len(self.getLogFiles()),
1,
"Exactly one log file was created after a restart and quit",
)
self.marionette.start_session()
def test_errors_flush_to_disk(self):
self.marionette.enforce_gecko_prefs(
{
"browser.sessionstore.log.appender.file.logOnSuccess": False,
}
)
self.marionette.quit()
sessionFile = self.getSessionFilePath()
self.assertTrue(
os.path.isfile(sessionFile),
"The sessionstore.jsonlz4 file exists in the profile directory",
)
# replace the contents with nonsense so we get a not-readable error on startup
with open(sessionFile, "wb") as f:
f.write(b"\x00\xFF\xABgarbageDATA")
self.marionette.start_session()
self.marionette.set_context("chrome")
errorLogFile = self.getMostRecentLogFile("error-")
self.assertTrue(errorLogFile, "We logged errors immediately")
if os.path.isfile(sessionFile):
os.remove(sessionFile)
def test_periodic_flush_to_disk(self):
# enable debug logging and verify that log messages are batched
# and only flush when the interval has passed
logFile = self.getMostRecentLogFile()
logFileCount = len(self.getLogFiles())
self.assertTrue(logFile, "A log file already exists")
self.assertEqual(logFileCount, 1, "A single 'success' log file exists")
startLineCount = self.getLineCount(logFile)
# open and close a window
self.openAndCloseSaveworthyWindow()
self.assertEqual(
self.getLineCount(logFile),
startLineCount,
"Log file line count unchanged; debug messages should be buffered and not yet flushed to disk",
)
# Set the flush interval to 1 second and open+close another window to trigger some
# debug log messages
self.marionette.set_pref("browser.sessionstore.logFlushIntervalSeconds", 1)
origWindow = self.marionette.current_window_handle
self.openAndCloseSaveworthyWindow()
try:
wait = Wait(self.marionette, timeout=5, interval=0.1)
wait.until(
lambda _: self.getLineCount(logFile) > startLineCount,
message="Waiting for line count in the log file to grow",
)
newLineCount = self.getLineCount(logFile)
self.assertTrue(
newLineCount > startLineCount,
f"{newLineCount - startLineCount} lines got flushed to the log file",
)
except errors.TimeoutException as e:
message = (
f"{e.message}. Line count didn't grow as expected in log file {logFile}"
)
raise errors.TimeoutException(message)
finally:
self.marionette.switch_to_window(origWindow)
self.marionette.clear_pref("browser.sessionstore.logFlushIntervalSeconds")
self.assertTrue(
self.getLineCount(logFile) > startLineCount,
"Debug log messages got flushed to disk",
)
#
self.assertEqual(
len(self.getLogFiles()),
logFileCount,
"We just appended to the one log file and didn't create any others",
)