|
| 1 | +from multiprocessing import Pipe |
| 2 | +from pathlib import Path |
| 3 | +from subprocess import PIPE, Popen |
| 4 | +import sys |
| 5 | + |
| 6 | +from discopy.frobenius import Category, Functor, Ty, Box |
| 7 | +from discopy.frobenius import Hypergraph as H, Id, Spider |
| 8 | +from discopy import python |
| 9 | + |
| 10 | +from .files import stream_diagram |
| 11 | + |
| 12 | + |
| 13 | + |
| 14 | +class IORun(python.Function): |
| 15 | + @classmethod |
| 16 | + def spiders(cls, n_legs_in: int, n_legs_out: int, typ: Ty): |
| 17 | + def step(*processes: IOProcess): |
| 18 | + """ |
| 19 | + A many-to-many pipe. |
| 20 | + """ |
| 21 | + assert typ == Ty("io") |
| 22 | + assert len(processes) == n_legs_in |
| 23 | + return tuple( |
| 24 | + IOSpiderProcess(*processes) |
| 25 | + for _ in range(n_legs_out)) |
| 26 | + return IORun( |
| 27 | + inside=step, |
| 28 | + dom=Ty(*("io" for _ in range(n_legs_in))), |
| 29 | + cod=Ty(*("io" for _ in range(n_legs_out)))) |
| 30 | + |
| 31 | + |
| 32 | +def command_io_f(diagram): |
| 33 | + """ |
| 34 | + close input parameters (constants) |
| 35 | + drop outputs matching input parameters |
| 36 | + all boxes are io->[io]""" |
| 37 | + def command_io(b): |
| 38 | + return Id().tensor(*( |
| 39 | + Id("io") @ Spider(0, 1, t) >> |
| 40 | + Box(b.name, |
| 41 | + Ty("io") @ t, |
| 42 | + Ty("io")) >> |
| 43 | + H.spiders(1, len(b.cod), Ty("io")).to_diagram() |
| 44 | + for t in b.dom)) |
| 45 | + f = Functor( |
| 46 | + lambda x: Ty("io"), |
| 47 | + lambda b: command_io(b),) |
| 48 | + diagram = f(diagram) |
| 49 | + return ( |
| 50 | + # H.spiders(0, len(diagram.dom), Ty("io")).to_diagram() >> |
| 51 | + diagram >> |
| 52 | + H.spiders(len(diagram.cod), 1, Ty("io")).to_diagram()) |
| 53 | + |
| 54 | + |
| 55 | +class IOProcess: |
| 56 | + """mutual with IOSpiderProcess""" |
| 57 | + def __init__(self, box, *spider_processes): |
| 58 | + assert box.dom[0] == Ty("io") |
| 59 | + assert box.cod == Ty("io") |
| 60 | + self.spider_processes = spider_processes |
| 61 | + self.popen_args = [box.name, *(t.name for t in box.dom[1:])] |
| 62 | + self.pipe_in, self.pipe_out = Pipe(duplex=False) |
| 63 | + |
| 64 | + def communicate(self, input): |
| 65 | + # print("process:", self.spider_processes) |
| 66 | + # i = self.pipe_in.recv() |
| 67 | + o = "" |
| 68 | + for p in self.spider_processes: |
| 69 | + o += p.communicate(input) |
| 70 | + # print(o, self.popen_args) |
| 71 | + popen = Popen(self.popen_args, stdin=PIPE, stdout=PIPE, text=True) |
| 72 | + (o, _) = popen.communicate(o) |
| 73 | + # print(o, self.popen_args) |
| 74 | + return o |
| 75 | + |
| 76 | + |
| 77 | +class IOSpiderProcess: |
| 78 | + """ |
| 79 | + takes many processes and creates n copies of their joined outputs |
| 80 | + mutual with IOSpider""" |
| 81 | + def __init__(self, *processes: IOProcess): |
| 82 | + self.processes = processes |
| 83 | + self.pipe_in, self.pipe_out = Pipe(duplex=False) |
| 84 | + |
| 85 | + def communicate(self, input): |
| 86 | + # print("spiderprocess:", [p.popen for p in self.processes]) |
| 87 | + w = "" |
| 88 | + for p in self.processes: |
| 89 | + o = p.communicate(input) |
| 90 | + w += o |
| 91 | + return w |
| 92 | + |
| 93 | +shell_f = Functor( |
| 94 | + lambda x: Ty("io"), |
| 95 | + lambda b: lambda *p: IOProcess(b, *p), |
| 96 | + cod=Category(Ty, IORun)) |
| 97 | + |
| 98 | + |
| 99 | +def widish_main(file_name): |
| 100 | + path = Path(file_name) |
| 101 | + diagram = stream_diagram(path.open()) |
| 102 | + diagram = command_io_f(diagram) |
| 103 | + diagram.draw(path=path.with_suffix(".jpg")) |
| 104 | + process: IOSpiderProcess = shell_f(diagram)() |
| 105 | + r = process.communicate("") |
| 106 | + sys.stdout.write(r) |
0 commit comments