from flask import Flask, request, Response from flask_restful import Resource, Api import os from object_detection.utils import label_map_util from object_detection.utils import visualization_utils as vis_util from object_detection.utils import ops as utils_ops from PIL import Image import base64 import io import json import re import tensorflow as tf import sys,getpass import numpy as np from flask import Flask, send_from_directory from flask_restful import Api from flask_cors import CORS, cross_origin app = Flask(__name__) api = Api(app) app.config['SECRET_KEY'] = 'the quick brown fox jumps over the lazy dog' app.config['CORS_HEADERS'] = 'Content-Type' cors = CORS(app, resources={r"/foo": {"origins": "*"}}) switches = {"coco":1, "damage":1} COCO_MODEL_NAME = "rfcn_resnet101_coco_2018_01_28" PATH_TO_FROZEN_COCO_GRAPH = 'modules/'+COCO_MODEL_NAME + '/frozen_inference_graph.pb' PATH_TO_FROZEN_DAMAGE_GRAPH = 'modules/trainedModels/ssd_mobilenet_RoadDamageDetector.pb' linux_def = {"detection_boxes":[(106, 188, 480, 452)],"detection_scores":[0.99],"detection_classes":[1]} detection_graph_coco = None detection_graph_damage = None if getpass.getuser() == "tedankara": detection_graph_coco = tf.Graph() detection_graph_damage = tf.Graph() with detection_graph_coco.as_default(): od_graph_def = tf.GraphDef() with tf.gfile.GFile(PATH_TO_FROZEN_COCO_GRAPH, 'rb') as fid: serialized_graph = fid.read() od_graph_def.ParseFromString(serialized_graph) tf.import_graph_def(od_graph_def, name='') with detection_graph_damage.as_default(): od_graph_def = tf.GraphDef() with tf.gfile.GFile(PATH_TO_FROZEN_DAMAGE_GRAPH, 'rb') as fid: serialized_graph = fid.read() od_graph_def.ParseFromString(serialized_graph) tf.import_graph_def(od_graph_def, name='') def load_image_into_numpy_array(image): (im_width, im_height) = image.size return np.array(image.getdata()).reshape( (im_height, im_width, 3)).astype(np.uint8) def decode_base64(data, altchars=b'+/'): """Decode base64, padding being optional. :param data: Base64 data as an ASCII byte string :returns: The decoded byte string. """ data = re.sub(rb'[^a-zA-Z0-9%s]+' % altchars, b'', bytes(data,"utf-8")) # normalize missing_padding = len(data) % 4 if missing_padding: data += b'='* (4 - missing_padding) return base64.b64decode(data, altchars) def run_inference_for_single_image(image, graph,type): global switches global sess_coco global sess_damage if not getpass.getuser() == "tedankara": return linux_def with graph.as_default(): if(switches[type]): if type == "coco": sess_coco = tf.Session() elif type == "damage": sess_damage = tf.Session() switches[type] = 0 if type == "coco": ops = tf.get_default_graph().get_operations() all_tensor_names = {output.name for op in ops for output in op.outputs} tensor_dict = {} for key in [ 'num_detections', 'detection_boxes', 'detection_scores', 'detection_classes', 'detection_masks' ]: tensor_name = key + ':0' if tensor_name in all_tensor_names: tensor_dict[key] = tf.get_default_graph().get_tensor_by_name( tensor_name) if 'detection_masks' in tensor_dict: # The following processing is only for single image detection_boxes = tf.squeeze(tensor_dict['detection_boxes'], [0]) detection_masks = tf.squeeze(tensor_dict['detection_masks'], [0]) # Reframe is required to translate mask from box coordinates to image coordinates and fit the image size. real_num_detection = tf.cast(tensor_dict['num_detections'][0], tf.int32) detection_boxes = tf.slice(detection_boxes, [0, 0], [real_num_detection, -1]) detection_masks = tf.slice(detection_masks, [0, 0, 0], [real_num_detection, -1, -1]) detection_masks_reframed = utils_ops.reframe_box_masks_to_image_masks( detection_masks, detection_boxes, image.shape[1], image.shape[2]) detection_masks_reframed = tf.cast( tf.greater(detection_masks_reframed, 0.5), tf.uint8) # Follow the convention by adding back the batch dimension tensor_dict['detection_masks'] = tf.expand_dims( detection_masks_reframed, 0) image_tensor = tf.get_default_graph().get_tensor_by_name('image_tensor:0') # Run inference output_dict = sess_coco.run(tensor_dict, feed_dict={image_tensor: image}) # all outputs are float32 numpy arrays, so convert types as appropriate output_dict['num_detections'] = int(output_dict['num_detections'][0]) output_dict['detection_classes'] = output_dict[ 'detection_classes'][0].astype(np.int64) output_dict['detection_boxes'] = output_dict['detection_boxes'][0] output_dict['detection_scores'] = output_dict['detection_scores'][0] if 'detection_masks' in output_dict: output_dict['detection_masks'] = output_dict['detection_masks'][0] elif type=="damage": image_tensor = graph.get_tensor_by_name('image_tensor:0') # Each box represents a part of the image where a particular object was detected. detection_boxes = graph.get_tensor_by_name('detection_boxes:0') # Each score represent how level of confidence for each of the objects. # Score is shown on the result image, together with the class label. detection_scores = graph.get_tensor_by_name('detection_scores:0') detection_classes = graph.get_tensor_by_name('detection_classes:0') num_detections = graph.get_tensor_by_name('num_detections:0') # Actual detection. (boxes, scores, classes, num) = sess_damage.run( [detection_boxes, detection_scores, detection_classes, num_detections], feed_dict={image_tensor: image}) output_dict = {'detection_classes': np.squeeze(classes).astype(np.int32), 'detection_scores': np.squeeze(scores),'detection_boxes': np.squeeze(boxes)} return output_dict class Process(Resource): def post(self): base64_img = request.form['img'] image = Image.open(io.BytesIO(decode_base64(base64_img))) type = request.form["type"] image_np = load_image_into_numpy_array(image) image_np_expanded = np.expand_dims(image_np, axis=0) if type == "coco": output_dict = run_inference_for_single_image(image_np_expanded, detection_graph_coco,type) elif type == "damage": output_dict = run_inference_for_single_image(image_np_expanded, detection_graph_damage,type) if getpass.getuser() == "tedankara": output_dict["detection_boxes"] = output_dict["detection_boxes"].tolist() output_dict["detection_scores"] = output_dict["detection_scores"].tolist() output_dict["detection_classes"] = output_dict["detection_classes"].tolist() return output_dict class NumpyEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, np.ndarray): return obj.tolist() return json.JSONEncoder.default(self, obj) if __name__ == '__main__': context = ('encryption/mycity.crt', 'encryption/mycity-decrypted.key') api.add_resource(Process, '/ai', '/ai/') app.run(host='0.0.0.0', port=5001, ssl_context=context, debug=False)