from multiprocessing import pool
import os
import cv2
import shutil
import tqdm
import glob
import subprocess
import multiprocessing
import numpy as np
import matplotlib
matplotlib.use('Agg') # For file output only, no GUI
import matplotlib.pyplot as plt
from skimage.measure import ransac, LineModelND
import sys
# Add the absolute path to the ./src folder
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../', 'FFMpeg')))
from Jpg2Video import create_video_from_images
def fit_and_rotate_image(image_path: os.PathLike,
experiment: str = None,
results: bool = True,
focus_ratio: float = 0.3) -> tuple[float, tuple, np.ndarray]:
"""
Fits a robust line to the bottom edges of an image and rotates the image to level the surface.
Args:
image_path (os.PathLike): Path to the input image.
experiment (str, optional): Experiment name for saving results.
results (bool, optional): If True, saves a diagnostic plot.
focus_ratio (float, optional): Portion of the image height to analyze from the bottom. Default 0.3.
Returns:
tuple:
- angle (float): Rotation angle in degrees.
- image_shape (tuple): Original image shape.
- rotated_image (np.ndarray): Rotated image.
"""
# image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
# # loc_image = image[:, :-410]
# # Detect edges using Canny
# edges = cv2.Canny(image, 50, 150)
# # Get edge coordinates
# y_indices, x_indices = np.where(edges > 0)
# points = np.column_stack((x_indices, y_indices))
# # Apply RANSAC to fit a line
# model, inliers = ransac(points, LineModelND, min_samples=2, residual_threshold=2, max_trials=1000)
# # Get line parameters
# line_x = np.array([min(x_indices), max(x_indices)])
# line_y = model.predict_y(line_x)
# # Compute angle of rotation
# dx = line_x[1] - line_x[0]
# dy = line_y[1] - line_y[0]
# angle = np.degrees(np.arctan2(dy, dx))
# # Rotate image to level the line
# (h, w) = image.shape[:2]
# center = (w // 2, h // 2)
# rotation_matrix = cv2.getRotationMatrix2D(center, angle, 1.0)
# rotated_image = cv2.warpAffine(image, rotation_matrix, (w, h), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_CONSTANT, borderValue=255)
# Load grayscale image
image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
h, w = image.shape[:2]
# Focus on the bottom region
focus_height = int(h * focus_ratio)
bottom_region = image[h - focus_height:h, :]
# Preprocess to stabilize edges
blurred = cv2.GaussianBlur(bottom_region, (5, 5), 0)
edges = cv2.Canny(blurred, 50, 150)
# Get edge coordinates
y_indices, x_indices = np.where(edges > 0)
if len(x_indices) < 2:
raise ValueError(f"Not enough edge points detected in {image_path}")
points = np.column_stack((x_indices, y_indices))
# Fit robust line using RANSAC
model, inliers = ransac(
points, LineModelND,
min_samples=2,
residual_threshold=1.0, # tighter fit
max_trials=5000 # more attempts
)
# Compute line endpoints
line_x = np.array([min(x_indices), max(x_indices)])
line_y = model.predict_y(line_x)
# Adjust for cropped region
line_y += (h - focus_height)
# Compute angle
dx = line_x[1] - line_x[0]
dy = line_y[1] - line_y[0]
angle = np.degrees(np.arctan2(dy, dx))
# Rotate around bottom-center to preserve surface alignment
center = (w // 2, h - 1)
rotation_matrix = cv2.getRotationMatrix2D(center, angle, 1.0)
rotated_image = cv2.warpAffine(image, rotation_matrix, (w, h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT,
borderValue=255)
# Optional visualization
if results:
plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
plt.imshow(image, cmap='gray')
plt.plot(line_x, line_y, color='red', linewidth=2)
plt.title("Detected Line")
plt.subplot(1, 2, 2)
plt.imshow(rotated_image, cmap='gray')
plt.title("Rotated Image")
save_dir = os.path.join(os.path.dirname(image_path), "rotation")
os.makedirs(save_dir, exist_ok=True)
plt.savefig(os.path.join(save_dir, "result.png"), dpi=300)
plt.close()
return angle, image.shape, rotated_image
def fit_image(image: cv2.Mat, black_base_line=10):
"""
Fits a line to the detected edges in a grayscale image using RANSAC, and computes
the vertical offset from the fitted line to a given black baseline.
Args:
image (cv2.Mat): Grayscale input image.
black_base_line (int, optional): Reference baseline offset in pixels. Defaults to 10.
Returns:
int: Vertical offset (in pixels) between the fitted line's center height and the black baseline.
"""
# Detect edges using Canny edge detector
edges = cv2.Canny(image, 50, 150)
# Find coordinates of non-zero (edge) pixels
y_indices, x_indices = np.where(edges > 0)
points = np.column_stack((x_indices, y_indices)) # Shape: (N, 2)
# Fit a robust line to the edge points using RANSAC (to handle outliers)
model, inliers = ransac(points, LineModelND, min_samples=2,
residual_threshold=2, max_trials=1000)
# Define X-range of the line (min to max X in the edge points)
line_x = np.array([min(x_indices), max(x_indices)])
# Predict corresponding Y values from the fitted line model
line_y = model.predict_y(line_x)
# Compute angle of the line (not used in return, but may be useful for debugging)
dx = line_x[1] - line_x[0]
dy = line_y[1] - line_y[0]
angle = np.degrees(np.arctan2(dy, dx)) # Angle of the fitted line in degrees
# Compute average height of the line and subtract the baseline
return int((line_y[1] + line_y[0]) // 2) - black_base_line
def line_finder(image:cv2.Mat, rotation_matrix:cv2.Mat, black_base_line:int = 10) -> int:
"""
Finds the height of the line in the image after applying a rotation matrix.
Args:
image (cv2.Mat): Input image in **grayscale**.
rotation_matrix (cv2.Mat): Rotation matrix to apply to the image.
black_base_line (int): The baseline height to subtract from the line height.
Returns:
int: Height of the line in the rotated image.
"""
image = cv2.imread(image, cv2.IMREAD_GRAYSCALE)
(h, w) = image.shape
# rotation_matrix = cv2.getRotationMatrix2D(center, angle, 1.0)
rotated_image = cv2.warpAffine(image, rotation_matrix, (w, h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT,
borderValue=255)
cropped_height = fit_image(rotated_image, black_base_line=black_base_line)
return cropped_height
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../', )))
from BottomRowUnifier import bottom_row_unifierGRAY, bottom_row_unifier
def process_image(filepath: str, rotation_matrix: np.ndarray, cropped_height, output_path: str = None,
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (13, 13))) -> None:
"""
Processes an image by applying a rotation matrix and saving the result.
Args:
file (str): Path to the input image file.
rotation_matrix (cv2.Mat): Rotation matrix to apply to the image.
Returns:
None: The function saves the processed image to the same path.
Calling image[cropped_height+10:, :] = 0 before image rotation make weird artifacts
"""
image = cv2.imread(filepath, cv2.IMREAD_GRAYSCALE)
(w, h) = image.shape[:2]
if output_path is None:
output_path = os.path.dirname(filepath).replace("frames", "frames_rotated")
if not os.path.isdir(output_path):
os.makedirs(output_path, exist_ok=True)
# rotation_matrix = cv2.getRotationMatrix2D(center, angle, 1.0)
rotated_image = cv2.warpAffine(image, rotation_matrix, (h, w ),
flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_CONSTANT, borderValue=255)
rotated_image[cropped_height+10:, :] = 0 # Set the top part of the image to black
# TODO: normalize bottom row
_rotated_image = bottom_row_unifierGRAY(rotated_image, target_height=130)
## Close operation fills small dark holes # Kernel size depends on spot size
# _rotated_image = cv2.morphologyEx(_rotated_image, cv2.MORPH_CLOSE, kernel)
cv2.imwrite(os.path.join(output_path, os.path.basename(filepath)), _rotated_image)
def folderBaseLineNormalizer(experiment, output_path= None):
files = sorted(glob.glob(os.path.join(experiment, "*.png")))
if output_path is None:
output_path = os.path.dirname(files[0]).replace("frames", "frames_rotated")
if len(glob.glob(os.path.join(output_path, "*.png"))) == len(files):
# print(f"All images in {output_path} are already processed.")
# return 1
pass
else:
if not os.path.isdir(output_path):
os.makedirs(output_path, exist_ok=True)
image = cv2.imread(os.path.join(experiment, files[2]), cv2.IMREAD_GRAYSCALE)
(h, w) = image.shape[:2]
center = (w // 2, h // 2)
angle,_shape, rotated_image = fit_and_rotate_image(os.path.join(experiment, files[2]),results=True)
rotation_matrix = cv2.getRotationMatrix2D(center, angle, 1.0)
with multiprocessing.Pool(processes=int(multiprocessing.cpu_count()*0.75)) as pool: #
cropped_height_list = pool.starmap(line_finder, [(file, rotation_matrix) for file in files])
cropped_height = np.array(cropped_height_list).mean().astype(np.int16)
rotation_matrix = cv2.getRotationMatrix2D((w // 2, cropped_height+10), angle, 1.0)
# for file in files:
# process_image(file, rotation_matrix, cropped_height)
# Rotate and save all images in parallel
# with multiprocessing.Pool(processes=int(multiprocessing.cpu_count()*0.75)) as pool:
# for _ in tqdm.tqdm(pool.starmap(process_image, [(file, rotation_matrix) for file in files]), total=len(files)):
# pass
with multiprocessing.Pool(processes=int(multiprocessing.cpu_count() * 0.75)) as pool:
pool.starmap(process_image, [(file, rotation_matrix,cropped_height) for file in files])
if __name__ == "__main__":
"""
I assumed Images are gray
"""
import shutil
for experiment in tqdm.tqdm(sorted(glob.glob("/media/d2u25/Dont/frames/*/*/*"))):
try:
video_experiment = experiment.replace("frames", "VideoProcess")
outputpath = os.path.join(video_experiment, "result.mp4")
if os.path.isfile(outputpath):
# print(f"Video already exists for {experiment}. Skipping...")
continue
folderBaseLineNormalizer(experiment)
if not os.path.isdir(video_experiment):
os.makedirs(video_experiment, exist_ok=True)
create_video_from_images(experiment.replace("frames", "frames_rotated"),outputpath, extension="png")
shutil.rmtree(experiment.replace("frames", "frames_rotated"))
except Exception as e:
print(f"Error processing {experiment}: {e}")
continue
# break
# angle, imageDim, rotated_image = fit_and_rotate_image("/media/d2u25/Dont/frames/280/S2-SNr2.1_D/T528_01/frame_000001.png", "T738_01", results=True)
# print(f"Rotation angle: {angle} degrees")
# '/media/d2u25/Dont/frames_rotated_rotated/285/S3-SNr3.06_D/T547_12'
# /media/d2u25/Dont/frames_rotated_rotated/280/S2-SNr2.1_D/T528_15