# REACTION

Breast Cancer Detection based on Mammography Image Using Convolutional Neural Network Algorithm

In [None]:
from ipywidgets import Layout, Button, Box, FileUpload, Label, Checkbox, IntSlider, IntText, Button
import ipywidgets as widgets
from IPython.display import display, Image
import threading
import time

import tensorflow as tf
#
from keras.preprocessing import image

# uploader to image cv2
import numpy as np
import cv2
import matplotlib.pyplot as plt
 
import os

import plotly
import plotly.graph_objects as go
import plotly.express as px
from plotly.offline import iplot, init_notebook_mode

In [None]:
# uploader
uploader = widgets.FileUpload(
 accept='image/*', # Accepted file extension e.g. '.txt', '.pdf', 'image/*', 'image/*,.pdf'
 multiple=False # True to accept multiple files upload else False
 )

In [None]:
# uploader container
image_container = widgets.VBox(
 children = ()
)

In [None]:
# wrapper
app = widgets.VBox(
 children = (uploader, image_container)
)

In [None]:
# uploader image content and extension
def get_content_and_extension(uploader): 
 for _, value in uploader.value.items():
 return value['content'], value['metadata']['name'].split('.')[-1]

In [None]:
# image widget from uploader
def image_widget_from_uploader(uploader):
 if uploader.value:
 file_content, file_extension = get_content_and_extension(uploader)
 return widgets.Image(value=file_content, format=file_extension)
 else:
 warnings.warn("NO IMAGE HAS BEEN UPLOADED", category=Warning)
 return widgets.Image()

In [None]:
# image widget from image numpy
def image_widget_from_numpy(image_numpy, extension):
 _, encoded_image = cv2.imencode('.'+extension, image_numpy)
 image_bytes = encoded_image.tobytes()
 return widgets.Image(value=image_bytes, format=extension)

In [None]:
# image numpy from uploader
def image_numpy_from_uploader(uploader):
 file_content, extension = get_content_and_extension(uploader)
 arr_from_buf = np.frombuffer(file_content, np.uint8)
 image_numpy = cv2.imdecode(arr_from_buf, cv2.IMREAD_COLOR)
 return image_numpy, extension

In [None]:
def plotly_image(image_numpy):
 if len(np.shape(image_numpy)) == 3 and np.shape(image_numpy)[2] == 3:
 fig = px.imshow(image_numpy[...,::-1])
 else:
 fig = px.imshow(image_numpy, color_continuous_scale='gray')
 
 layout = go.Layout(
 margin=go.layout.Margin(
 l=0, #left margin
 r=0, #right margin
 b=0, #bottom margin
 t=0 #top margin
 )
 )

# Set dragmode and newshape properties; add modebar buttons
 update = fig.update_layout(layout,
 dragmode='drawrect',
 newshape=dict(line_color='cyan'),
# title_text='Drag to add annotations - use modebar to change drawing tool',
 )
 config = {
 'modeBarButtonsToAdd': ['drawrect','eraseshape']
 }
 show = fig.show(config=config)

In [None]:
image_plotly_container = widgets.VBox(
 children = ()
)

In [None]:
h5_path = 'modelcnn.h5'
model = tf.keras.models.load_model(h5_path)

In [None]:
# uploader
annotate_image_uploader = widgets.FileUpload(
 accept='image/*', # Accepted file extension e.g. '.txt', '.pdf', 'image/*', 'image/*,.pdf'
 multiple=False # True to accept multiple files upload else False
 )


In [None]:
# uploader container
image_annotate_container = widgets.VBox(
 children = ()
)

In [None]:
# wrapper
app_2 = widgets.VBox(
 children = (annotate_image_uploader, image_annotate_container,)
)

In [None]:
# uploader image content and extension
def get_content_and_extension(annotate_image_uploader): 
 for _, value in annotate_image_uploader.value.items():
 return value['content'], value['metadata']['name'].split('.')[-1]

In [None]:
# image widget from annotate_image_uploader
def image_widget_from_uploader(annotate_image_uploader):
 if annotate_image_uploader.value:
 file_content, file_extension = get_content_and_extension(annotate_image_uploader)
 return widgets.Image(value=file_content, format=file_extension)
 else:
 warnings.warn("NO IMAGE HAS BEEN UPLOADED", category=Warning)
 return widgets.Image()

In [None]:
# image numpy from uploader
def image_numpy_from_uploader(annotate_image_uploader):
 file_content, extension = get_content_and_extension(annotate_image_uploader)
 arr_from_buf = np.frombuffer(file_content, np.uint8)
 image_numpy = cv2.imdecode(arr_from_buf, cv2.IMREAD_COLOR)
 return image_numpy, extension

In [None]:
def is_cyan(rgb):
 match = False
 if((rgb[0]>250) & (rgb[1]>250) & (rgb[2]<5)):
 match = True
 return match

In [None]:
def img_coordinate_1(image_numpy):
 x1 = 0 # default, top left
 y1 = 0 # default, top left
 found = False
 
 for i in range(len(image_numpy)):
 for j in range(len(image_numpy[i])):
 if is_cyan(image_numpy[i][j]):
 x1 = i
 y1 = j
 found = True
 break
 if found:
 break
 
 return x1, y1

In [None]:
def img_coordinate_2(image_numpy):
 flip_image = cv2.flip(image_numpy, -1)
 x2 = 1 # default, bottom right
 y2 = 1 # default, bottom right
 found = False
 
 for i in range(len(flip_image)):
 for j in range(len(flip_image[i])):
 if is_cyan(flip_image[i][j]):
 x2 = i
 y2 = j
 found = True
 break
 if found:
 break
 return x2, y2

In [None]:
def img_crop(image_numpy, x1, y1, x2, y2):
 cropped_image = image_numpy[x1+4:-x2-4, y1+4:-y2-4]
 return cropped_image

In [None]:
def image_reshape(image):
 image = cv2.resize(image, (256, 256))
 image = cv2.cvtColor(image,cv2.COLOR_GRAY2RGB)
 image = image.reshape(-1, 256, 256, 3)
 return image

In [None]:
# def prepro_greyscale(image_numpy):
# return cv2.cvtColor(image_numpy, cv2.COLOR_BGR2GRAY)

def prepro_clahe(image_numpy, clipLimit=2.0, tileGridSize=8):
 image_numpy = cv2.cvtColor(image_numpy, cv2.COLOR_BGR2GRAY)
 clahe = cv2.createCLAHE(clipLimit, (tileGridSize, tileGridSize))
 return clahe.apply(image_numpy)

def prepro_resize(image_numpy,resize_w=256,resize_l=256):
 return cv2.resize(image_numpy,(resize_w,resize_l))

def prepro_normalize(image_numpy,resize_w=256,resize_l=256):
 out_arr = np.zeros((resize_w,resize_l))
 return cv2.normalize(image_numpy, out_arr, 0, 255, cv2.NORM_MINMAX)

In [None]:
def predicted_image(img_reshape_2, model):
# model = model(input_shape= (32,32,1))
 result = model.predict(img_reshape_2)
 if result.any() == 0:
 prediction = 'NORMAL'
 else:
 prediction = 'KANKER'

 # prediction = print(prediction)
 result = ' ' + str(prediction) + ' '
 return result

In [None]:
text_result = widgets.Label(value="Result will be shown here!")

In [None]:
text = widgets.VBox(
 children = ([text_result]), layout=Layout(
 display='flex',
 flex_flow='column',
 border='solid 2px',
 align_items='stretch',
 ))

In [None]:
# uploader container
text_container = widgets.VBox(
 children = (text,)
)

In [None]:
form_item_layout = Layout(
 display='flex',
 flex_flow='row',
 justify_content='space-between'
)

In [None]:
# label
label_current_thread = widgets.Label()
label_thread_list = widgets.Label()

# button 
button_stop = widgets.Button(
 description = 'stop',
 icon = 'fa-stop',
 button_style = 'warning',
 layout = widgets.Layout(width='100px')
)
def button_stop_click(self):
 global thread_status
 thread_status = False
button_stop.on_click(button_stop_click)

button_check_thread = widgets.Button(
 description = 'check thread',
 button_style = '',
 layout = widgets.Layout(width='100px')
)
def button_check_thread_click(self):
 # print if work() still running
 work_count = 0
 name='work'
 for thread_object in threading.enumerate():
 if thread_object.name == name:
 work_count+=1
 if work_count > 0:
 label_thread_list.value=f'{work_count} thread with name {name} still exist. Consider use exit() and restart kernel'
 else:
 label_thread_list.value=f'No thread with name {name} exist'
button_check_thread.on_click(button_check_thread_click)

In [None]:
# main loop
def work():
 global thread_status
 global model
 global image_numpy
 global rgb
 global img_reshape_2
 
 if thread_status:
 label_current_thread.value = 'Thread On'

 current_state = [uploader.value]
 current_state_2 = [annotate_image_uploader.value]
 
 while thread_status:
 
 # check if there is a change in uploader.value
 if current_state != [uploader.value]:
 
 # renew state reference
 current_state = [uploader.value]
 
 # image widget from uploader
 image_widget_raw = image_widget_from_uploader(uploader)
 
 # renew container_raw
 image_container.children = (image_widget_raw,)

 # read image numpy
 image_numpy, extension = image_numpy_from_uploader(uploader)
 
 # plotly
 plotly_image(image_numpy)
 
 # check if there is a change in uploader.value
 if current_state_2 !=[annotate_image_uploader.value]:
 
 # Preprocessing message
 text_result.value = 'Preprocessing...'
 text.children = (text_result,)
 if text_result.value:
 text_container.children = (text,)
 
 # renew state reference
 current_state_2 = [annotate_image_uploader.value] 
 
 image_widget_annotate = image_widget_from_uploader(annotate_image_uploader)
 
 image_numpy, extension = image_numpy_from_uploader(annotate_image_uploader)
 
 # preprocess and predict
 x1, y1 = img_coordinate_1(image_numpy)
 x2, y2 = img_coordinate_2(image_numpy)

 image_numpy = img_crop(image_numpy, x1, y1, x2, y2)
 
 image_numpy = prepro_clahe(image_numpy)

 image_numpy = prepro_resize(image_numpy)

 image_numpy = prepro_normalize(image_numpy)
 
 image_numpy_widget = image_widget_from_numpy(image_numpy, extension)
 
 image_numpy = image_reshape(image_numpy)
 
 image_annotate_container.children = (image_numpy_widget,)
 
 # Predict message message
 text_result.value = 'Predicting...'
 text.children = (text_result,)
 if text_result.value:
 text_container.children = (text,)
 
 text_result.value = predicted_image(image_numpy, model)
 text.children = (text_result,)
 if text_result.value:
 text_container.children = (text,)
 
 # sleep for threading
 time.sleep(0.05)
 
 if not thread_status:
 label_current_thread.value = 'Thread Off'

In [None]:
# restart flag
thread_status = False 
thread_status = True

# create thread
thread_work = threading.Thread(target=work)
thread_work.name = 'work'

# start thread
thread_work.start()

In [None]:
Box([Label(value='Upload Mammography'), app], layout=form_item_layout)

In [None]:
Box([Label(value='Upload Annotate Image'), app_2], layout=form_item_layout)

In [None]:
Box([Label(value='Result'), text_container], layout=form_item_layout)