from modules import utils
|
|
|
|
from flask import Flask, request, Response
|
|
from flask_restful import Resource, Api
|
|
|
|
from PIL import Image
|
|
import cv2
|
|
|
|
import base64
|
|
import json
|
|
import sys
|
|
import os
|
|
import io
|
|
import itertools
|
|
import pickle
|
|
import copy
|
|
from urllib.parse import urlencode
|
|
from urllib.request import Request, urlopen
|
|
import ssl
|
|
from object_detection.utils import label_map_util
|
|
import face_recognition
|
|
|
|
AI_IP = '10.10.26.161'
|
|
|
|
VEHICLE_CLASSES = [3, 6, 8]
|
|
MIN_AREA_RATIO = 0.9
|
|
import numpy as np
|
|
|
|
MIN_SCORE_THRESH = 0.6
|
|
|
|
if sys.platform == "win32":
|
|
sys.path.insert(0, r'C:\Users\Tednokent01\Downloads\MyCity\traffic_analyzer')
|
|
|
|
PATH_TO_LABELS = os.path.join('object_detection/data', 'mscoco_label_map.pbtxt')
|
|
|
|
|
|
category_index = label_map_util.create_category_index_from_labelmap(PATH_TO_LABELS, use_display_name=True)
|
|
|
|
app = Flask(__name__)
|
|
api = Api(app)
|
|
|
|
db_path = os.path.join(app.root_path, 'databases', 'crashes.json')
|
|
with open(db_path, 'r') as f:
|
|
crashes = json.load(f)
|
|
|
|
users_path = os.path.join(app.root_path, 'databases', 'users.json')
|
|
with open(users_path, 'r') as f:
|
|
users = json.load(f)
|
|
|
|
def load_image_into_numpy_array(image):
|
|
(im_width, im_height) = image.size
|
|
return np.array(image.getdata()).reshape(
|
|
(im_height, im_width, 3)).astype(np.uint8)
|
|
|
|
context = ssl._create_unverified_context()
|
|
|
|
def find_name(image):
|
|
try:
|
|
known_faces = []
|
|
known_face_names = []
|
|
for v in users.values():
|
|
known_faces.append(np.array(v['face_encoding']))
|
|
known_face_names.append(v['id'])
|
|
|
|
face_encoding = face_recognition.face_encodings(image)[0]
|
|
results = face_recognition.compare_faces(known_faces, face_encoding)
|
|
name = "Unknown"
|
|
face_distances = face_recognition.face_distance(known_faces, face_encoding)
|
|
best_match_index = np.argmin(face_distances)
|
|
if results[best_match_index]:
|
|
name = known_face_names[best_match_index]
|
|
|
|
return name
|
|
except:
|
|
return None
|
|
|
|
def rotate_img(img, angle):
|
|
if angle == 90:
|
|
return np.rot90(img)
|
|
elif angle == 270:
|
|
return np.rot90(np.rot90(np.rot90(img)))
|
|
|
|
def process_img(img_base64):
|
|
url = 'https://{}:5001/ai'.format(AI_IP) # Set destination URL here
|
|
post_fields = {'img': img_base64,"type":"coco"} # Set POST fields here
|
|
request = Request(url, urlencode(post_fields).encode())
|
|
data = urlopen(request, context=context).read().decode("ascii")
|
|
output_dict = json.loads(json.loads(data))
|
|
image_np = cv2.cvtColor(load_image_into_numpy_array(Image.open(io.BytesIO(base64.b64decode(img_base64)))),cv2.COLOR_RGB2BGR)
|
|
|
|
output_dict_processed = {"detection_classes":[], "detection_scores":[], "detection_boxes":[]}
|
|
im_height, im_width, _ = image_np.shape
|
|
cars_involved = 0
|
|
injured_people = 0
|
|
prev_cars = []
|
|
boxes = []
|
|
|
|
spam_boxes = []
|
|
for index, i in enumerate(output_dict['detection_classes']):
|
|
score = output_dict['detection_scores'][index]
|
|
if score > MIN_SCORE_THRESH:
|
|
box = output_dict['detection_boxes'][index]
|
|
boxes.append(Box((box[1] * im_width, box[3] * im_width,
|
|
box[0] * im_height, box[2] * im_height),
|
|
i,index))
|
|
box_combinations = itertools.combinations(boxes,r=2)
|
|
for combination in box_combinations:
|
|
big = combination[0].get_bigger(combination[1])
|
|
if big and not big in spam_boxes:
|
|
spam_boxes.append(big)
|
|
for spam in spam_boxes:
|
|
boxes.remove(spam)
|
|
|
|
for box in boxes:
|
|
output_dict_processed["detection_classes"].append(output_dict["detection_classes"][box.index])
|
|
output_dict_processed["detection_scores"].append(output_dict["detection_scores"][box.index])
|
|
output_dict_processed["detection_boxes"].append(output_dict["detection_boxes"][box.index])
|
|
|
|
people = {}
|
|
for index, i in enumerate(output_dict['detection_classes']):
|
|
score = output_dict['detection_scores'][index]
|
|
if score > MIN_SCORE_THRESH:
|
|
if i in VEHICLE_CLASSES:
|
|
box = output_dict['detection_boxes'][index]
|
|
(left, right, top, bottom) = (box[1] * im_width, box[3] * im_width,
|
|
box[0] * im_height, box[2] * im_height)
|
|
avg_x = left+right/2
|
|
avg_y = top+bottom/2
|
|
same = False
|
|
for prev_x, prev_y in prev_cars:
|
|
if abs(prev_x-avg_x) < 130 and abs(prev_y-avg_y) < 130:
|
|
same = True
|
|
break
|
|
if not same:
|
|
cars_involved += 1
|
|
prev_cars.append((avg_x, avg_y))
|
|
elif i == 1:
|
|
continue
|
|
box = output_dict['detection_boxes'][index]
|
|
(left, right, top, bottom) = tuple(map(int, (box[1] * im_width, box[3] * im_width,
|
|
box[0] * im_height, box[2] * im_height)))
|
|
person = image_np[int(top):int(bottom),int(left):int(right)]
|
|
if right-left > bottom-top:
|
|
rotated = rotate_img(person, 90)
|
|
name = None
|
|
try:
|
|
face_locs = face_recognition.face_locations(rotated)[0]
|
|
name = find_name(rotated)
|
|
except Exception:
|
|
pass
|
|
(height_person,width_person) = person.shape[:2]
|
|
|
|
if name is None:
|
|
rotated = rotate_img(person, 270)
|
|
face_locs = face_recognition.face_locations(rotated)[0]
|
|
name = find_name(rotated)
|
|
(top_face, right_face, bottom_face, left_face) = face_locs
|
|
face_locs_processed = (top + height_person - right_face,left+bottom_face,top + height_person - left_face,left+top_face)
|
|
else:
|
|
(top_face, right_face, bottom_face, left_face) = face_locs
|
|
person = cv2.rectangle(person, (width_person - bottom_face, left_face), (width_person - top_face, right_face), (0, 255, 0), 3)
|
|
face_locs_processed = (top + left_face,left + width_person - top_face,top + right_face,left + width_person - bottom_face)
|
|
people[index] = [0, face_locs_processed, name]
|
|
else:
|
|
face_locs = face_recognition.face_locations(person)[0]
|
|
(top_face, right_face, bottom_face, left_face) = face_locs
|
|
face_locs_processed = (top+face_locs[0],left+face_locs[1],top+face_locs[2],left+face_locs[3])
|
|
name = find_name(person)
|
|
people[index] = [1, face_locs_processed, name]
|
|
|
|
|
|
|
|
|
|
_, buffer = cv2.imencode('.jpg', image_np)
|
|
for i in range(len(output_dict_processed["detection_classes"])):
|
|
box = output_dict_processed["detection_boxes"][i]
|
|
output_dict_processed["detection_boxes"][i] = [box[1] * im_width, box[3] * im_width, box[0] * im_height, box[2] * im_height]
|
|
|
|
output_dict_processed["detection_classes"][i] = category_index[output_dict_processed["detection_classes"][i]]["name"]
|
|
|
|
return base64.b64encode(buffer).decode('ascii'), cars_involved, injured_people,output_dict_processed,people
|
|
|
|
|
|
class Crash(Resource):
|
|
def post(self):
|
|
message = request.form['message']
|
|
base64_img = request.form['img']
|
|
id = request.form['id']
|
|
lat, long = request.form['lat'], request.form['long']
|
|
|
|
image, car_count, injured,out,people = process_img(base64_img)
|
|
(top, right, bottom, left) = people[0][1]
|
|
top = int(top)
|
|
right = int(right)
|
|
left = int(left)
|
|
bottom = int(bottom)
|
|
img = load_image_into_numpy_array(Image.open(io.BytesIO(base64.b64decode(base64_img))))
|
|
cv2.rectangle(img,(left,top),(right,bottom),(0,255,0),3)
|
|
cv2.imshow('test.jpg', img)
|
|
cv2.waitKey(0)
|
|
cv2.destroyAllWindows()
|
|
print(people)
|
|
priority = car_count + injured
|
|
if priority > 10:
|
|
priority = 10
|
|
|
|
crash = {
|
|
'img': image,
|
|
'message': message,
|
|
'priority': priority,
|
|
'stats': {
|
|
'cars': car_count,
|
|
'injured': injured
|
|
},
|
|
'location': {
|
|
'latitude': lat,
|
|
'longitude': long
|
|
},
|
|
"output_dict": out,
|
|
"people":people
|
|
}
|
|
if id in crashes:
|
|
crashes[id].append(crash)
|
|
else:
|
|
crashes[id] = [crash]
|
|
|
|
with open(db_path, 'w') as f:
|
|
json.dump(crashes, f, indent=2)
|
|
return crash
|
|
|
|
class Crashes(Resource):
|
|
def post(self):
|
|
process_dict = copy.deepcopy(crashes)
|
|
return_dict = {}
|
|
for id in process_dict:
|
|
for i in range(len(process_dict[id])):
|
|
del process_dict[id][i]["img"]
|
|
|
|
for id in process_dict:
|
|
for i in range(len(process_dict[id])):
|
|
location = process_dict[id][i]['location']
|
|
lat, lng = float(request.form['lat']), float(request.form['lng'])
|
|
if abs(float(location['latitude']) - lat) < 0.3 and abs(float(location['longitude']) - lng) < 0.3:
|
|
if id in return_dict:
|
|
return_dict[id].append(process_dict[id][i])
|
|
else:
|
|
return_dict[id] = [process_dict[id][i]]
|
|
|
|
return return_dict
|
|
|
|
|
|
class Box:
|
|
def __init__(self,coords, type,index):
|
|
self.x1 = coords[0]
|
|
self.y1 = coords[2]
|
|
self.x2 = coords[1]
|
|
self.y2 = coords[3]
|
|
self.area = (self.x2-self.x1) * (self.y2-self.y1)
|
|
self.type = type
|
|
self.index = index
|
|
|
|
def get_bigger(self,box):
|
|
if box.type != self.type:
|
|
return None
|
|
left = max(box.x1, self.x1)
|
|
right = min(box.x2, self.x2)
|
|
bottom = max(box.y2, self.y2)
|
|
top = min(box.y1, self.y1)
|
|
|
|
if not left < right and bottom < top:
|
|
return None
|
|
area_temp = abs((right-left)*(top-bottom))
|
|
if abs((right-left)*(top-bottom))/((box.area * (box.area < self.area)) + (self.area * (box.area > self.area))) < MIN_AREA_RATIO:
|
|
return None
|
|
|
|
if box.area > self.area:
|
|
return box
|
|
else:
|
|
return self
|
|
|
|
|
|
|
|
|
|
|