videostream: add downscaler core + test

This commit is contained in:
Sebastien Bourdeauducq 2014-02-10 00:12:57 +01:00
parent 2a3803d3a1
commit 42c25f44ad
2 changed files with 132 additions and 15 deletions

View File

@ -8,7 +8,7 @@ class Chopper(Module):
def __init__(self, frac_bits):
self.p = Signal(frac_bits)
self.q = Signal(frac_bits)
self.chopper = Signal()
self.chopper = Signal(reset=1)
###
@ -27,7 +27,6 @@ class _ChopperTB(Module):
self.comb += self.dut.p.eq(320), self.dut.q.eq(681)
def gen_simulation(self, selfp):
yield
ones = 0
niter = 681
for i in range(niter):
@ -216,8 +215,15 @@ class Packer(Module):
)
class _CompacterPackerTB(Module):
def __init__(self, input_it):
self.input_it = input_it
def __init__(self):
self.test_seq = [
(42, 0), (32, 1), ( 4, 1), (21, 0),
(43, 1), (11, 1), ( 5, 1), (18, 0),
(71, 0), (70, 1), (30, 1), (12, 1),
( 3, 1), (12, 1), (21, 1), (10, 0),
( 1, 1), (87, 0), (72, 0), (12, 0)
]
self.input_it = iter(self.test_seq)
self.output = []
self.end_cycle = -1
@ -227,6 +233,8 @@ class _CompacterPackerTB(Module):
def do_simulation(self, selfp):
if selfp.simulator.cycle_counter == self.end_cycle:
print("got: " + str(self.output))
print("expected: " + str([value for value, keep in self.test_seq if keep]))
raise StopSimulation
# push values
@ -247,6 +255,122 @@ class _CompacterPackerTB(Module):
for i in range(4):
self.output.append(getattr(selfp.packer.o, "w"+str(i)))
class DownscalerCore(Module):
def __init__(self, base_layout, N, res_bits):
self.init = Signal()
self.ready = Signal()
self.ce = Signal()
self.hres_in = Signal(res_bits)
self.vres_in = Signal(res_bits)
self.i = Record([("w"+str(i), base_layout) for i in range(N)])
self.hres_out = Signal(res_bits)
self.vres_out = Signal(res_bits)
self.o = Record([("w"+str(i), base_layout) for i in range(N)])
self.stb = Signal()
###
packbits = log2_int(N)
hcounter = Signal(res_bits-packbits)
self.sync += If(self.init,
hcounter.eq(self.hres_in[packbits:] - 1)
).Elif(self.ce,
If(hcounter == 0,
hcounter.eq(self.hres_in[packbits:] - 1)
).Else(
hcounter.eq(hcounter - 1)
)
)
self.submodules.vselector = InsertReset(InsertCE(Chopper(res_bits)))
self.comb += [
self.vselector.reset.eq(self.init),
self.vselector.ce.eq(self.ce & (hcounter == 0)),
self.vselector.p.eq(self.vres_out),
self.vselector.q.eq(self.vres_in)
]
self.submodules.hselector = MultiChopper(N, res_bits)
self.comb += [
self.hselector.init.eq(self.init),
self.ready.eq(self.hselector.ready),
self.hselector.next.eq(self.ce),
self.hselector.p.eq(self.hres_out),
self.hselector.q.eq(self.hres_in)
]
self.submodules.compacter = InsertReset(InsertCE(Compacter(base_layout, N)))
self.submodules.packer = InsertReset(InsertCE(Packer(base_layout, N)))
self.comb += [
self.compacter.reset.eq(self.init),
self.packer.reset.eq(self.init),
self.compacter.ce.eq(self.ce),
self.packer.ce.eq(self.ce),
self.compacter.i.eq(self.i),
self.compacter.sel.eq(self.hselector.chopper & Replicate(self.vselector.chopper, N)),
self.packer.i.eq(self.compacter.o),
self.packer.count.eq(self.compacter.count),
self.o.eq(self.packer.o),
self.stb.eq(self.packer.stb)
]
def _img_iter(img):
for y in range(img.size[1]):
for x in range(img.size[0]):
newpix = yield img.getpixel((x, y))
if newpix is not None:
img.putpixel((x, y), newpix)
class _DownscalerCoreTB(Module):
def __init__(self):
layout = [("r", 8), ("g", 8), ("b", 8)]
self.submodules.dut = DownscalerCore(layout, 4, 11)
def gen_simulation(self, selfp):
from PIL import Image
import subprocess
dut = selfp.dut
im_in = Image.open("testpic_in.jpg")
im_out = Image.new("RGB", (320, 240))
print("initializing downscaler...")
dut.init = 1
dut.hres_in, dut.vres_in = im_in.size
dut.hres_out, dut.vres_out = im_out.size
yield
dut.init = 0
yield
while not dut.ready:
yield
print("done")
dut.ce = 1
it_in, it_out = _img_iter(im_in), _img_iter(im_out)
it_out.send(None)
while True:
try:
for i in range(4):
w = getattr(dut.i, "w"+str(i))
w.r, w.g, w.b = next(it_in)
except StopIteration:
pass
if dut.stb:
try:
for i in range(4):
w = getattr(dut.o, "w"+str(i))
it_out.send((w.r, w.g, w.b))
except StopIteration:
break
yield
im_out.save("testpic_out.png")
try:
subprocess.call(["tycat", "testpic_out.png"])
except OSError:
print("Image saved as testpic_out.png, but could not be displayed.")
pass
if __name__ == "__main__":
print("*** Testing chopper ***")
@ -256,14 +380,7 @@ if __name__ == "__main__":
run_simulation(_MultiChopperTB())
print("*** Testing compacter and packer ***")
test_seq = [
(42, 0), (32, 1), ( 4, 1), (21, 0),
(43, 1), (11, 1), ( 5, 1), (18, 0),
(71, 0), (70, 1), (30, 1), (12, 1),
( 3, 1), (12, 1), (21, 1), (10, 0),
( 1, 1), (87, 0), (72, 0), (12, 0)
]
tb = _CompacterPackerTB(iter(test_seq))
run_simulation(tb)
print("got: " + str(tb.output))
print("expected: " + str([value for value, keep in test_seq if keep]))
run_simulation(_CompacterPackerTB())
print("*** Testing downscaler core ***")
run_simulation(_DownscalerCoreTB())

Binary file not shown.

After

Width:  |  Height:  |  Size: 49 KiB