{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Find rows in a column\n", "\n", "Use Tesseract to separate columns into rows." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import cv2\n", "import pandas as pd\n", "import numpy as np\n", "try:\n", " from PIL import Image\n", "except ImportError:\n", " import Image\n", "import pytesseract\n", "from statistics import mean\n", "import math\n", "import statistics\n", "import re\n", "import os\n", "import tempfile\n", "from fuzzywuzzy import fuzz\n", "from tqdm.auto import tqdm\n", "from pathlib import Path" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "# These OCR image preprocessing steps are based on https://stackoverflow.com/a/43493383\n", "# I don't really understand why this particular combination of filters works, but it does seem to improve OCR results\n", "\n", "BINARY_THRESHOLD = 200\n", "\n", "def process_image_for_ocr(file_path):\n", " # TODO : Implement using opencv\n", " temp_filename = set_image_dpi(file_path)\n", " im_new = remove_noise_and_smooth(temp_filename)\n", " return im_new\n", "\n", "\n", "def set_image_dpi(file_path):\n", " im = Image.open(file_path)\n", " #length_x, width_y = im.size\n", " #factor = max(1, int(IMAGE_SIZE / length_x))\n", " #size = factor * length_x, factor * width_y\n", " # size = (1800, 1800)\n", " #im_resized = im.resize(size, Image.ANTIALIAS)\n", " temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.jpg')\n", " temp_filename = temp_file.name\n", " im.save(temp_filename, dpi=(300, 300))\n", " return temp_filename\n", "\n", "\n", "def image_smoothening(img):\n", " ret1, th1 = cv2.threshold(img, BINARY_THRESHOLD, 255, cv2.THRESH_BINARY)\n", " ret2, th2 = cv2.threshold(th1, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)\n", " blur = cv2.GaussianBlur(th2, (1, 1), 0)\n", " ret3, th3 = cv2.threshold(blur, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)\n", " return th3\n", "\n", "\n", "def remove_noise_and_smooth(file_name):\n", " img = cv2.imread(file_name, 0)\n", " filtered = cv2.adaptiveThreshold(img.astype(np.uint8), 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY, 41, 3)\n", " kernel = np.ones((1, 1), np.uint8)\n", " opening = cv2.morphologyEx(filtered, cv2.MORPH_OPEN, kernel)\n", " closing = cv2.morphologyEx(opening, cv2.MORPH_CLOSE, kernel)\n", " img = image_smoothening(img)\n", " or_image = cv2.bitwise_or(img, closing)\n", " return or_image" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "def get_ocr(image_path):\n", " temp_img = process_image_for_ocr(image_path)\n", " df = pytesseract.image_to_data(temp_img, config='--psm 4 --oem 1 -l eng', output_type=pytesseract.Output.DATAFRAME)\n", " return df\n", "\n", "\"\"\"\n", "def find_col_width(df):\n", " for confidence in reversed(range(60, 110, 10)):\n", " for heading in ['buyers', 'closing', 'quotations']:\n", " for word in df.loc[(df['level'] == 5) & (df['top'] < 100)].sort_values(by='top', ascending=False).itertuples():\n", " # print(word.text.lower())\n", " if len(str(word.text)) > 5 and fuzz.partial_ratio(heading, word.text.lower()) >= confidence:\n", " # print(word)\n", " # print(fuzz.ratio('buyers', word.text.lower()))\n", " if word.left > 250:\n", " return word.left\n", " return None\n", "\"\"\"\n", "\n", "def find_col_width(df):\n", " candidates = []\n", " for confidence in reversed(range(80, 110, 10)):\n", " for heading in ['buyers', 'closing', 'quotations']:\n", " for word in df.loc[(df['level'] == 5) & (df['left'] < 1500)].sort_values(by='top').itertuples():\n", " # print(word.text.lower())\n", " if len(str(word.text)) > 5 and fuzz.partial_ratio(heading, word.text.lower()) >= confidence:\n", " # print(word)\n", " # print(fuzz.ratio('buyers', word.text.lower()))\n", " if word.left > 625:\n", " candidates.append(word.left)\n", " try:\n", " lowest = sorted(candidates)[0] - 10\n", " except IndexError:\n", " lowest = None\n", " return lowest\n", "\n", "def save_row(img, words, word_top, word_height, word_left, col_width, width, row_file):\n", " new_img = img.copy()\n", " top = words['top'].min()\n", " height = words['height'].max()\n", " left = 0\n", " if word_height:\n", " # print(top, top+height, word_top+word_height)\n", " cv2.line(new_img, (word_left, word_top + word_height), (col_width, word_top + word_height), (255,0,0), 1)\n", " row = new_img[max(0, word_top - 20):word_top + word_height + 20, left: word_left + width]\n", " cv2.imwrite(str(row_file), row)\n", "\n", "def find_text(vol_dir, image_path, save_markup=False):\n", " col_data = []\n", " vol_id = re.search(r'(AU NBAC N193-\\d+)', str(image_path)).group(1)\n", " # vol_id = 'AU NBAC N193-001'\n", " # print(vol_id)\n", " image_name = os.path.basename(image_path)\n", " page_id, col_id = re.search(r'N193-\\d+_(\\d+)\\.*-col-(\\d+).jpg', image_name).groups()\n", " page_id = int(page_id)\n", " col_id = int(col_id)\n", " # print(page_id, col_id)\n", " # output_path = Path(vol_dir, 'rows')\n", " output_path = Path(f'/Volumes/bigdata/mydata/stockexchange/rows/{vol_id}')\n", " output_path.mkdir(parents=True, exist_ok=True)\n", " #rowcol_path = Path(output_path, 'cols')\n", " #rowcol_path.mkdir(parents=True, exist_ok=True)\n", " df = get_ocr(image_path)\n", " img = cv2.imread(str(image_path))\n", " h, w = img.shape[:2]\n", " col_width = find_col_width(df)\n", " if not col_width:\n", " # col_width = 400\n", " col_width = 1000\n", " # print(col_width)\n", " if save_markup:\n", " new_img = np.zeros((h, w * 2, 3), np.uint8)\n", " new_img[:] = (255, 255, 255)\n", " new_img[0:h,0:w] = img\n", " cv2.line(new_img,(col_width, 0),(col_width, h),(0,0,255),1)\n", " ft = cv2.freetype.createFreeType2()\n", " ft.loadFontData(fontFileName='/Library/Fonts/Arial Unicode.ttf', id=0)\n", " row_id = 0\n", " for para, lines in df.loc[(df['level'] == 5)].groupby(by=['block_num', 'par_num']):\n", " for line, words in lines.groupby(by=['line_num']):\n", " left = 10000\n", " right = 0\n", " top = 10000\n", " height = 0\n", " heights = []\n", " tops = []\n", " name_parts = []\n", " for word in words.itertuples():\n", " # Make sure it's not just nonsense\n", " cleaned_word = re.sub(r'[^&%\\(\\)\\\"\\w\\s\\/\\-]', '', str(word.text))\n", " if cleaned_word and not cleaned_word.isspace() and (word.left + word.width) < (col_width + 20):\n", " name_parts.append(str(word.text))\n", " if word.left < left:\n", " left = word.left\n", " if word.top < top:\n", " top = word.top\n", " if word.height > height:\n", " height = word.height\n", " if word.left + word.width > right:\n", " right = word.left + word.width\n", " tops.append(word.top)\n", " heights.append(word.height)\n", " height = int(round(statistics.mean(heights)))\n", " top = int(round(statistics.mean(tops)))\n", " # row_file = Path(output_path, f'{image_name[:-4].replace(\".\", \"\")}-row-{row_id}.jpg')\n", " # save_row(img, words, top, height, left, col_width, w, row_file)\n", " if name_parts:\n", " name_string = ' '.join(name_parts).replace('”', '\"').replace('»', '\"').replace('’', \"'\")\n", " # print(name_string)\n", " # Removes non-word characters & most punctuation\n", " cleaned_name = re.sub(r'[^&%\\(\\)\\\"\\w\\s\\/\\-\\']', '', name_string)\n", " # OCR seems to turn dots into these words (and perhaps others)\n", " cleaned_name = re.sub(r'\\s*\\b[on,an,we,ee,oe,os,vs,\\s]+\\s*$', '', cleaned_name).strip()\n", " if cleaned_name and not cleaned_name.isspace():\n", " # print(left, top, height, cleaned_name)\n", " col_data.append({'vol_id': vol_id, 'page_id': page_id, 'col_id': col_id, 'row_id': row_id, 'text': cleaned_name, 'left': left, 'top': top, 'height': height, 'right': right })\n", " if save_markup:\n", " cv2.line(new_img,(0, top+height),(w * 2, top+height),(255,0,0),2)\n", " ft.putText(new_img, cleaned_name, (w + 20, top + height), fontHeight=40, color=(0, 0, 0), thickness=-1, line_type=cv2.LINE_AA, bottomLeftOrigin=True)\n", " row_file = Path(output_path, f'{image_name[:-4].replace(\".\", \"\")}-row-{row_id}.jpg')\n", " save_row(img, words, top, height, left, col_width, w, row_file)\n", " row_id += 1\n", " if save_markup:\n", " markup_img = Path(rowcol_path, f'{image_name[:-4]}-rows.jpg')\n", " cv2.imwrite(str(markup_img), new_img)\n", " return col_data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Process directories" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "input_path = Path('/Volumes/bigdata/mydata/stockexchange/processed')\n", "start_vol = 103\n", "\n", "for vol_dir in tqdm([d for d in input_path.glob('*') if d.is_dir()], desc='Directories'):\n", " # print(img_dir)\n", " output_path = Path(vol_dir, 'rows')\n", " rowcol_path = Path(output_path, 'cols')\n", " rowcol_path.mkdir(parents=True, exist_ok=True)\n", " vol_num = int(re.search(r'(\\d+)$', vol_dir.name).group(1))\n", " vol_data = []\n", " columns_dir = Path(vol_dir, 'columns')\n", " if vol_num >= start_vol:\n", " for img_name in tqdm([i for i in columns_dir.glob('*.jpg')]):\n", " # print(img_name)\n", " vol_data += find_text(vol_dir, img_name, save_markup=True)\n", " df_text = pd.DataFrame(vol_data).sort_values(by=['vol_id', 'page_id', 'col_id', 'row_id'])\n", " df_text.to_csv(f'vol-{vol_num}-text.csv')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.5" } }, "nbformat": 4, "nbformat_minor": 4 }