|
|
- import json
-
- class Frame():
- def __init__(self, time, data):
- self.time = time
- self.data = data
-
- class GridEyeData():
- def __init__(self, filePath):
- self.f = open(filePath, 'r')
- self.frames = [None]*4
- def readFrame(self):
- time = self.f.readline()
- if not time:
- return False
- time = float(time)
- for i in range(4):
- data = json.loads(self.f.readline())
- self.frames[i] = Frame(time, data)
- return True
-
- if __name__ == '__main__':
- import cv2
- import numpy as np
- import sys
- from functools import reduce
- def exponential(img, value):
- tmp = cv2.pow(img.astype(np.double), value)*(255.0/(255.0**value))
- return tmp.astype(np.uint8)
-
- SIZE = 128
- AVERAGE_FRAME = 10
- distanceBetweenSensors_w = 2.6 #cm
- distanceBetweenSensors_h = 2.6 #cm
- distance2Object = 60.0 #cm
- ADJUST_BACK = 5
- EXPONENTAL_VALUE = 0.4
- PRODUCTION_THRESHOLD = 10
- MIN_EXIST_TIME = 0.1
- cnt = 0
- avers = []
-
- raw_aver = np.array([0]*64*4, np.float64)
- raw_aver2 = np.array([0]*64*4, np.float64)
-
- fourcc = cv2.VideoWriter_fourcc(*'XVID')
- videoWriter = cv2.VideoWriter('output.avi', fourcc, 10.0, (SIZE*2,SIZE*4))
- cv2.imshow('sample', np.zeros((SIZE*4,SIZE*2), np.uint8))
- gridEye = GridEyeData(sys.argv[1])
-
- hasLastFrame = False
- hasPos = False
- innerHasPos = False
- endTime = 0
- startTime = 0
- innerEndTime = 0
- innerStartTime = 0
- path = []
- speed = 0
- avers.append(np.zeros((SIZE,SIZE), np.uint16))
- avers.append(np.zeros((SIZE,SIZE), np.uint16))
- avers.append(np.zeros((SIZE,SIZE), np.uint16))
- avers.append(np.zeros((SIZE,SIZE), np.uint16))
- while gridEye.readFrame():
- frames = gridEye.frames
- imgs = []
- raw = []
- for frame in frames:
- img = (np.array(frame.data)-15)*10
- img = cv2.resize(img.astype(np.uint8), (SIZE,SIZE), interpolation = cv2.INTER_LINEAR) # INTER_LINEAR, INTER_CUBIC
- imgs.append(img)
- raw += reduce(lambda x,y: x+y, frame.data)
- raw_aver += np.array(raw)
- raw_aver2 += np.array(raw)**2
- if cnt < AVERAGE_FRAME:
- cnt += 1
- for i in range(len(imgs)):
- avers[i] += imgs[i]
- if cnt == AVERAGE_FRAME:
- b = (raw_aver/AVERAGE_FRAME)**2
- a = raw_aver2/AVERAGE_FRAME
- print ('aver', raw_aver/AVERAGE_FRAME)
- print ((a-b)**0.5)
- print (sum((a-b)**0.5)/64/4)
- exit()
- for i in range(len(avers)):
- avers[i] = avers[i]/AVERAGE_FRAME
- avers[i] = avers[i].astype(np.uint8)
- avers[i] += ADJUST_BACK
- continue
-
- for i in range(len(imgs)):
- imgs[i] = cv2.subtract(imgs[i], avers[i])
- out = np.full((SIZE*4, SIZE*2), 255, dtype=np.uint16)
- out[:SIZE, :SIZE] = imgs[0]
- out[:SIZE, SIZE:SIZE*2] = imgs[1]
- out[SIZE:SIZE*2, :SIZE] = imgs[2]
- out[SIZE:SIZE*2, SIZE:SIZE*2] = imgs[3]
-
- # production
- '''
- maxProduct = 0
- overlap_w = 0
- for i in range(80, 128):
- product = sum(imgs[0][:,SIZE-i:].astype(np.uint32)*imgs[1][:,:i].astype(np.uint32))
- product += sum(imgs[2][:,SIZE-i:].astype(np.uint32)*imgs[3][:,:i].astype(np.uint32))
- product = sum(product) / len(product)
- if product > maxProduct:
- maxProduct = product
- overlap_w = i
- tmp = maxProduct
- maxProduct = 0
- overlap_h = 0
- for i in range(80, 128):
- product = sum(imgs[0][SIZE-i:, :].astype(np.uint32)*imgs[2][:i,:].astype(np.uint32))
- product += sum(imgs[1][SIZE-i:, :].astype(np.uint32)*imgs[3][:i,:].astype(np.uint32))
- product = sum(product) / len(product)
- if product > maxProduct:
- maxProduct = product
- overlap_h = i
- maxProduct = (tmp + maxProduct)/2
-
- # fixed overlap_h
- '''
-
- maxProduct = 0
- overlaps = 125
- overlap_w = overlaps
- overlap_h = overlaps
- '''
- product = sum(imgs[0][:,SIZE-overlaps:].astype(np.uint32)*imgs[1][:,:overlaps].astype(np.uint32))
- product += sum(imgs[2][:,SIZE-overlaps:].astype(np.uint32)*imgs[3][:,:overlaps].astype(np.uint32))
- product = sum(product) / len(product)
- maxProduct = product
- tmp = maxProduct
- maxProduct = 0
- product = sum(imgs[0][SIZE-overlaps:, :].astype(np.uint32)*imgs[2][:overlaps,:].astype(np.uint32))
- product += sum(imgs[1][SIZE-overlaps:, :].astype(np.uint32)*imgs[3][:overlaps,:].astype(np.uint32))
- product = sum(product) / len(product)
- maxProduct = product
- maxProduct = (tmp + maxProduct)/2
- '''
-
- #if maxProduct > PRODUCTION_THRESHOLD:
- if True:
- tmp = np.zeros((SIZE, SIZE*2-overlap_w), dtype=np.uint16)
- tmp[:, :SIZE] = imgs[0]
- tmp[:, -SIZE:] += imgs[1]
- tmp[:, (SIZE-overlap_w): SIZE] = tmp[:, (SIZE-overlap_w): SIZE]/2
-
- tmp2 = np.zeros((SIZE, SIZE*2-overlap_w), dtype=np.uint16)
- tmp2[:, :SIZE] = imgs[2]
- tmp2[:, -SIZE:] += imgs[3]
- tmp2[:, (SIZE-overlap_w): SIZE] = tmp2[:, (SIZE-overlap_w): SIZE]/2
-
- merge = np.zeros((SIZE*2-overlap_h, SIZE*2-overlap_w), dtype=np.uint16)
- merge[:SIZE, :] = tmp
- merge[-SIZE:, :] += tmp2
- merge[(SIZE-overlap_h):SIZE, :] = merge[(SIZE-overlap_h):SIZE, :]/2
- offset_w = int(overlap_w/2)
- offset_h = int(overlap_h/2)
- out[SIZE*2+offset_h:SIZE*4-overlap_h+offset_h, offset_w: SIZE*2-overlap_w+offset_w] = merge
- '''
- position = [0,0]
- rows,cols = merge.shape
-
- for i in range(rows):
- for j in range(cols):
- position[0] += i*merge[i][j]
- position[1] += j*merge[i][j]
- position[0] /= sum(sum(merge))
- position[1] /= sum(sum(merge))
-
- pos_w = 1.17*position[0] #distanceBetweenSensors_w/(SIZE-overlap_w)*position[0]
- pos_h = 1.17*position[1] #distanceBetweenSensors_h/(SIZE-overlap_h)*position[1]
- if not hasPos:
- startPos = [pos_w, pos_h]
- sp = position
- path = []
- truePath = []
- times = []
- startTime = frames[0].time
- hasPos = True
- if not innerHasPos and pos_w >= 16 and pos_w <= 109 and pos_h >= 16 and pos_h <= 109:
- innerStartPos = [pos_w, pos_h]
- innerStartTime = frames[0].time
- innerHasPos = True
-
- if pos_w >= 16 and pos_w <= 109 and pos_h >= 16 and pos_h <= 109:
- innerEndPos = [pos_w, pos_h]
- innerEndTime = frames[0].time
- elif innerHasPos:
- if innerEndTime - innerStartTime > 0:
- print (innerStartPos, innerEndPos)
- print ('inner speed:', ((innerEndPos[0]-innerStartPos[0])**2+(innerEndPos[1]-innerStartPos[1])**2)**0.5/(innerEndTime - innerStartTime))
- print ('time:', innerEndTime-innerStartTime)
- innerHasPos = False
-
- endPos = [pos_w, pos_h]
- endTime = frames[0].time
- path.append(position)
- truePath.append(endPos)
- times.append(frames[0].time)
- '''
- elif hasPos:
- if endTime - startTime > 0:
- print (startPos, endPos)
- print ('speed:', ((endPos[0]-startPos[0])**2+(endPos[1]-startPos[1])**2)**0.5/(endTime - startTime))
- print ('time:', endTime-startTime)
- if innerHasPos and innerEndTime - innerStartTime > 0:
- print (innerStartPos, innerEndPos)
- print ('inner speed:', ((innerEndPos[0]-innerStartPos[0])**2+(innerEndPos[1]-innerStartPos[1])**2)**0.5/(innerEndTime - innerStartTime))
- print ('time:', innerEndTime-innerStartTime)
- hasPos = False
- innerHasPos = False
- out = out.astype(np.uint8)
- out = exponential(out, EXPONENTAL_VALUE)
-
- out = cv2.cvtColor(out,cv2.COLOR_GRAY2BGR)
- '''
- if endTime - startTime > MIN_EXIST_TIME:
- speed = ((endPos[0]-startPos[0])**2+(endPos[1]-startPos[1])**2)**0.5/(endTime - startTime)
- cv2.putText(out, f'{speed:.2f}',
- (0, SIZE*2),cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)
- speed = ((truePath[-1][0]-truePath[-2][0])**2+(truePath[-1][1]-truePath[-2][1])**2)**0.5/(times[-1] - times[-2])
- cv2.putText(out, f'{speed:.2f}',
- (0, SIZE*2+30),cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)
- if maxProduct > PRODUCTION_THRESHOLD:
- cv2.circle(out, (offset_w+int(position[1]), SIZE*2+offset_h+int(position[0])), 10, (255,0,0), 5)
- cv2.circle(out, (offset_w+int(sp[1]), SIZE*2+offset_h+int(sp[0])), 10, (0,255,0), 5)
- for i in range(len(path)-1):
- cv2.line(out, (offset_w+int(path[i][1]), SIZE*2+offset_h+int(path[i][0])), (offset_w+int(path[i+1][1]), SIZE*2+offset_h+int(path[i+1][0])), (0,0,255))
- cv2.line
- lastFrame = out[SIZE*2:,:]
- hasLastFrame = True
- elif hasLastFrame:
- out[SIZE*2:,:] = lastFrame
- '''
- cv2.imshow('sample', out)
- videoWriter.write(out)
-
- key = cv2.waitKey(1)
- if key == ord('q'):
- break
-
- videoWriter.release()
- cv2.destroyAllWindows()
|