import cv2
|
|
import serial
|
|
import threading
|
|
import time
|
|
|
|
class Frame():
|
|
def __init__(self, time, data):
|
|
self.time = time
|
|
self.data = data
|
|
|
|
class GridEye():
|
|
def __init__(self, serialPort, baudrate):
|
|
self.port = serial.Serial(serialPort, baudrate)
|
|
self.frame = None
|
|
self.reading = True
|
|
self.thread = threading.Thread(target = self.reader)
|
|
self.thread.setDaemon(True)
|
|
self.lock = threading.Lock()
|
|
|
|
def start(self):
|
|
self.port.reset_input_buffer()
|
|
self.thread.start()
|
|
|
|
def stop(self):
|
|
self.reading = False
|
|
self.thread.join()
|
|
|
|
def reader(self):
|
|
data = []
|
|
data_time = 0;
|
|
while (self.reading):
|
|
line = b''
|
|
while (self.reading):
|
|
c = self.port.read()
|
|
if c == b'\r':
|
|
c = self.port.read()
|
|
break
|
|
if c == b'\n':
|
|
break
|
|
line += c
|
|
#line = self.port.readline()#.decode('utf-8')
|
|
# if line:
|
|
# print (line)
|
|
# time.sleep(0.01)
|
|
# if self.port.in_waiting > 0:
|
|
# print (self.port.in_waiting)
|
|
if b'#' in line:
|
|
if len(data) == 8:
|
|
#print (data)
|
|
self.lock.acquire()
|
|
self.frame = Frame(data_time, data)
|
|
self.lock.release()
|
|
else:
|
|
print ('something wrong', len(data))
|
|
data_time = time.time()
|
|
data = []
|
|
else:
|
|
try:
|
|
row = [float(x) for x in line.split()]
|
|
if len(row) == 8:
|
|
data.append(row)
|
|
except ValueError as e:
|
|
print ('error', e)
|
|
data_time = time.time()
|
|
data = []
|
|
if len(data) > 8:
|
|
data.pop(0)
|
|
|
|
class Distance():
|
|
def __init__(self, serialPort, baudrate):
|
|
self.port = serial.Serial(serialPort, baudrate)
|
|
self.distance = 200
|
|
self.reading = True
|
|
self.thread = threading.Thread(target = self.reader)
|
|
self.thread.setDaemon(True)
|
|
self.lock = threading.Lock()
|
|
|
|
def start(self):
|
|
self.port.reset_input_buffer()
|
|
self.thread.start()
|
|
|
|
def stop(self):
|
|
self.reading = False
|
|
self.thread.join()
|
|
|
|
def reader(self):
|
|
while (self.reading):
|
|
line = b''
|
|
while (self.reading):
|
|
c = self.port.read()
|
|
if c == b'\r':
|
|
c = self.port.read()
|
|
break
|
|
if c == b'\n':
|
|
break
|
|
line += c
|
|
if b'Distance' in line:
|
|
try:
|
|
dist = float(line.decode('utf-8').split(':')[1])
|
|
print (dist)
|
|
if dist > 200.0:
|
|
dist = 200.0
|
|
self.lock.acquire()
|
|
self.distance = dist
|
|
self.lock.release()
|
|
except ValueError as e:
|
|
print ('error', e)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
import cv2
|
|
import numpy as np
|
|
import math
|
|
def exponential(img, value):
|
|
tmp = cv2.pow(img.astype(np.double), value)*(255.0/(255.0**value))
|
|
return tmp.astype(np.uint8)
|
|
|
|
SIZE = 128
|
|
overlap = 120
|
|
AVERAGE_FRAME = 10
|
|
distanceBetweenSensors = 7.7 #cm
|
|
distance2Object = 60.0 #cm
|
|
ADJUST_BACK = 5
|
|
EXPONENTAL_VALUE = 0.4
|
|
|
|
offset = (distanceBetweenSensors / (2*distance2Object*math.tan(30.0/180.0*math.pi))) * SIZE
|
|
overlap = int(SIZE - offset)
|
|
print (overlap)
|
|
grideye = GridEye('COM15', 115200)
|
|
grideye.start()
|
|
grideye2 = GridEye('COM17', 115200)
|
|
grideye2.start()
|
|
|
|
distanceSensor = Distance('COM18', 9600)
|
|
distanceSensor.start()
|
|
|
|
fourcc = cv2.VideoWriter_fourcc(*'XVID')
|
|
videoWriter = cv2.VideoWriter('output.avi', fourcc, 10.0, (SIZE*2,SIZE*3))
|
|
cv2.imshow('sample', np.zeros((SIZE*3,SIZE*2), np.uint8))
|
|
aver1 = np.zeros((SIZE,SIZE), np.uint16)
|
|
aver2 = np.zeros((SIZE,SIZE), np.uint16)
|
|
cnt = 0
|
|
while True:
|
|
if grideye.frame and grideye2.frame:
|
|
grideye.lock.acquire()
|
|
grideye2.lock.acquire()
|
|
distanceSensor.lock.acquire()
|
|
frame = grideye.frame
|
|
grideye.frame = None
|
|
frame2 = grideye2.frame
|
|
grideye2.frame = None
|
|
# frame2 = frame
|
|
distance2Object = distanceSensor.distance
|
|
distanceSensor.lock.release()
|
|
grideye2.lock.release()
|
|
grideye.lock.release()
|
|
|
|
img = (np.array(frame.data)-15)*10
|
|
img = cv2.resize(img.astype(np.uint8), (SIZE,SIZE), interpolation = cv2.INTER_CUBIC) # INTER_LINEAR, INTER_CUBIC
|
|
img2 = (np.array(frame2.data)-15)*10
|
|
img2 = cv2.resize(img2.astype(np.uint8), (SIZE,SIZE), interpolation = cv2.INTER_CUBIC)
|
|
|
|
if cnt < AVERAGE_FRAME:
|
|
cnt += 1
|
|
aver1 += img
|
|
aver2 += img2
|
|
if cnt == AVERAGE_FRAME:
|
|
aver1 = aver1/AVERAGE_FRAME
|
|
aver1 = aver1.astype(np.uint8)
|
|
aver1 += ADJUST_BACK
|
|
aver2 = aver2/AVERAGE_FRAME
|
|
aver2 = aver2.astype(np.uint8)
|
|
aver2 += ADJUST_BACK
|
|
continue
|
|
img = cv2.subtract(img, aver1)
|
|
img2 = cv2.subtract(img2, aver2)
|
|
|
|
out = np.full((SIZE*3, SIZE*2), 255, dtype=np.uint16)
|
|
out[:SIZE, :SIZE] = img
|
|
out[:SIZE, SIZE:] = img2
|
|
|
|
|
|
overlap = int(SIZE - (distanceBetweenSensors / (2*distance2Object*math.tan(30.0/180.0*math.pi))) * SIZE)
|
|
offset = int(overlap/2)
|
|
# tmp = cv2.resize(img.astype(np.uint8), (SIZE*2-overlap, SIZE))
|
|
# tmp.astype(np.uint16)
|
|
tmp = np.zeros((SIZE, SIZE*2-overlap), dtype=np.uint16)
|
|
tmp[:, :SIZE] = img
|
|
tmp[:, -SIZE:] += img2
|
|
tmp[:, (SIZE-overlap): SIZE] = tmp[:, (SIZE-overlap): SIZE]/2
|
|
tmp = exponential(tmp, EXPONENTAL_VALUE)
|
|
out[SIZE:SIZE*2, offset: SIZE*2-overlap+offset] = tmp
|
|
# out[SIZE:SIZE*2, offset:SIZE+offset] = img
|
|
# out[SIZE:SIZE*2, (SIZE-overlap)+offset:SIZE+offset] += img2[:,:overlap]
|
|
# out[SIZE:SIZE*2, (SIZE-overlap)+offset:SIZE+offset] = out[SIZE:SIZE*2, (SIZE-overlap)+offset:SIZE+offset]/2
|
|
# out[SIZE:SIZE*2, SIZE+offset:SIZE+(SIZE-overlap)+offset] = img2[:,overlap:SIZE]
|
|
|
|
|
|
maxProduct = 0
|
|
overlap2 = 0
|
|
for i in range(80, 128):
|
|
product = sum(img[:,SIZE-i:].astype(np.uint32)*img2[:,:i].astype(np.uint32))
|
|
product = sum(product) / len(product)
|
|
if product > maxProduct:
|
|
maxProduct = product
|
|
overlap2 = i
|
|
|
|
|
|
|
|
offset = int(overlap2/2)
|
|
tmp = np.zeros((SIZE, SIZE*2-overlap2), dtype=np.uint16)
|
|
tmp[:, :SIZE] = img
|
|
tmp[:, -SIZE:] += img2
|
|
tmp[:, (SIZE-overlap2): SIZE] = tmp[:, (SIZE-overlap2): SIZE]/2
|
|
tmp = exponential(tmp, EXPONENTAL_VALUE)
|
|
out[SIZE*2:, offset: SIZE*2-overlap2+offset] = tmp
|
|
|
|
|
|
# out[SIZE*2:, offset:SIZE+offset] = img
|
|
# out[SIZE*2:, (SIZE-overlap2)+offset:SIZE+offset] += img2[:,:overlap2]
|
|
# out[SIZE*2:, (SIZE-overlap2)+offset:SIZE+offset] = out[SIZE*2:, (SIZE-overlap2)+offset:SIZE+offset]/2
|
|
# out[SIZE*2:, SIZE+offset:SIZE+(SIZE-overlap2)+offset] = img2[:,overlap2:SIZE]
|
|
out = out.astype(np.uint8)
|
|
|
|
cv2.imshow('sample', out)
|
|
videoWriter.write(cv2.cvtColor(out,cv2.COLOR_GRAY2BGR))
|
|
key = cv2.waitKey(1)
|
|
if key == ord('q'):
|
|
break
|
|
elif key == ord('a'):
|
|
overlap += 1
|
|
elif key == ord('d'):
|
|
overlap -= 1
|
|
elif key == ord('c'):
|
|
cv2.imwrite('out.jpg', out)
|
|
time.sleep(0.001)
|
|
grideye.stop()
|
|
grideye2.stop()
|
|
videoWriter.release()
|
|
cv2.destroyAllWindows()
|
|
|