Source code for monochrome.ipc

import os
import socket
import subprocess
import sys
import time
from pathlib import Path
from typing import Dict, List, Optional, Text, Union

import flatbuffers
import numpy as np

from .fbs import (
    Array3DataChunkf,
    Array3DataChunku8,
    Array3DataChunku16,
    Array3Meta,
    Array3MetaFlow,
    CloseVideo,
    CloseAllVideos,
    Filepaths,
    PointsVideo,
    Root,
    SetPlaybackSpeed,
    VideoExport,
)
from .fbs.ArrayDataType import ArrayDataType
from .fbs.BitRange import BitRange
from .fbs.Color import CreateColor
from .fbs.ColorMap import ColorMap
from .fbs.Data import Data
from .fbs.DictEntry import DictEntryAddKey, DictEntryAddVal, DictEntryEnd, DictEntryStart
from .fbs.OpacityFunction import OpacityFunction
from .fbs.VideoExportFormat import VideoExportFormat

if sys.platform == "win32":
    MONOCHROME_BIN_PATH = Path(__file__).parent / "data" / "bin" / "Monochrome.exe"
elif sys.platform == "darwin":
    MONOCHROME_BIN_PATH = Path(__file__).parent / "data" / "Monochrome.app"
else:
    MONOCHROME_BIN_PATH = Path(__file__).parent / "data" / "bin" / "Monochrome"

USE_TCP = sys.platform in ["win32", "cygwin"]
TCP_IP, TCP_PORT = "127.0.0.1", 4864
# OSX doesn't support abstract UNIX domain sockets
ABSTRACT_DOMAIN_SOCKET_SUPPORTED = sys.platform != "darwin"
if sys.platform != "win32":
    SOCK_PATH = f"\0Monochrome{os.getuid()}" if ABSTRACT_DOMAIN_SOCKET_SUPPORTED else f"/tmp/Monochrome{os.getuid()}.s"
else:
    SOCK_PATH = None
MAX_BUFFER_SIZE = 16352
MONOCHROME_DEFAULT_ARGS = {}


def start_monochrome(speed: Optional[float] = None,
                     display_fps: Optional[int] = None,
                     scale: Optional[float] = None,
                     fliph: bool = False,
                     flipv: bool = False,
                     **kwargs):
    """Start bundled Monochrome executable with the given settings."""
    args = []
    if speed:
        args.append("--speed")
        args.append(str(speed))
    if display_fps:
        args.append("--display_fps")
        args.append(str(display_fps))
    if scale:
        args.append("--scale")
        args.append(str(scale))
    if fliph:
        args.append("--fliph")
    if flipv:
        args.append("--flipv")
    kwargs = {**MONOCHROME_DEFAULT_ARGS, **kwargs}
    for key, val in kwargs.items():
        args.append(f"--{key}")
        if isinstance(val, bool):
            pass
        else:
            args.append(str(val))

    if sys.platform == "darwin" and '--unit-test-mode' in args:
        cmd = [str(MONOCHROME_BIN_PATH / 'Contents' / 'MacOS' / 'Monochrome'), ] + args
    elif sys.platform == "darwin":
        cmd = ["open", "-a", str(MONOCHROME_BIN_PATH), "--args", ] + args
    else:
        cmd = [str(MONOCHROME_BIN_PATH), ] + args
    subprocess.Popen(cmd, start_new_session=True)


def console_entrypoint():
    if sys.platform != "darwin":
        args = [str(MONOCHROME_BIN_PATH)]
    else:
        args = ["open", "-a", str(MONOCHROME_BIN_PATH), "--args"]
    args.extend(sys.argv[1:])
    subprocess.Popen(args).wait()


def _create_socket():
    if USE_TCP:
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.connect((TCP_IP, TCP_PORT))
    else:
        if not ABSTRACT_DOMAIN_SOCKET_SUPPORTED and not Path(SOCK_PATH).exists():
            raise ConnectionRefusedError("No socket found, please start Monochrome")
        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        s.connect(SOCK_PATH)
    return s


def create_socket():
    s = None
    try:
        s = _create_socket()
    except ConnectionRefusedError:
        if not USE_TCP and not ABSTRACT_DOMAIN_SOCKET_SUPPORTED:
            Path(SOCK_PATH).unlink(missing_ok=True)
        start_monochrome()
        waiting = True
        timeout = time.time() + 5
        while waiting and time.time() < timeout:
            try:
                s = _create_socket()
                waiting = False
            except ConnectionRefusedError:
                pass
        if waiting:
            raise ConnectionRefusedError("Could not connect to Monochrome")
    return s


def get_color(builder, color):
    if color is None:
        return None

    try:
        import matplotlib.colors as mcolors
        color = mcolors.to_rgba(color)
    except ImportError:
        if isinstance(color, str):
            print("ERROR: unable to import matplotlib, please install it")
            color = (0.0, 0.0, 0.0, 1.0)
    return CreateColor(builder, color)


def build_root(builder, data_type, data):
    Root.Start(builder)
    Root.AddDataType(builder, data_type)
    Root.AddData(builder, data)
    root = Root.End(builder)
    return root


def create_filepaths_msg(paths):
    builder = flatbuffers.Builder(512)
    paths_fb = [builder.CreateString(s) for s in paths]
    Filepaths.StartFileVector(builder, len(paths_fb))
    for p in paths_fb:
        builder.PrependSOffsetTRelative(p)
    vec = builder.EndVector()
    Filepaths.Start(builder)
    Filepaths.AddFile(builder, vec)
    fp = Filepaths.End(builder)
    root = build_root(builder, Data.Filepaths, fp)
    builder.FinishSizePrefixed(root)
    buf = builder.Output()
    return buf


def create_pointsvideo_msg(points_py, name, parent_name=None, color=None, point_size=None):
    builder = flatbuffers.Builder(1024)
    name_fb = builder.CreateString(name)
    parent_fb = builder.CreateString(parent_name) if parent_name else None

    flat = []
    indexes = []
    for frame in points_py:
        for point in frame:
            flat.append(point[0])
            flat.append(point[1])
        indexes.append(len(flat))
    flat = np.array(flat, dtype=np.float32)
    indexs = np.array(indexes, dtype=np.uint32)
    flat_fb = builder.CreateNumpyVector(flat)
    indexes_fb = builder.CreateNumpyVector(indexs)

    PointsVideo.Start(builder)
    PointsVideo.AddName(builder, name_fb)
    if parent_fb:
        PointsVideo.AddParentName(builder, parent_fb)
    if color:
        PointsVideo.AddColor(builder, get_color(builder, color))
    if point_size:
        PointsVideo.AddPointSize(builder, point_size)
    PointsVideo.AddPointsData(builder, flat_fb)
    PointsVideo.AddTimeIdxs(builder, indexes_fb)
    fp = PointsVideo.End(builder)
    root = build_root(builder, Data.PointsVideo, fp)
    builder.FinishSizePrefixed(root)
    buf = builder.Output()
    return buf

def create_array3meta_msg(dtype: ArrayDataType, name, shape, duration=0., fps=0., date="", comment="",
                          bitrange=BitRange.AUTODETECT, cmap=ColorMap.DEFAULT, parent_name=None, opacity=None,
                          metadata=None, vmin=None, vmax=None):
    builder = flatbuffers.Builder(1024)
    name_fb = builder.CreateString(name)
    parent_fb = builder.CreateString(parent_name) if parent_name is not None else None
    date_fb = builder.CreateString(date)
    comment_fb = builder.CreateString(comment)
    if metadata:
        metadata = [(builder.CreateString(key), builder.CreateString(str(val))) for key, val in metadata.items()]
        metaData_fbs = []
        for key, val in metadata:
            DictEntryStart(builder)
            DictEntryAddKey(builder, key)
            DictEntryAddVal(builder, val)
            metaData_fbs.append(DictEntryEnd(builder))
        Array3Meta.Array3MetaStartMetadataVector(builder, len(metadata))
        for e in metaData_fbs:
            builder.PrependUOffsetTRelative(e)
        metadata = builder.EndVector()
    Array3Meta.Start(builder)
    Array3Meta.AddType(builder, dtype)
    Array3Meta.AddNx(builder, shape[2])
    Array3Meta.AddNy(builder, shape[1])
    Array3Meta.AddNt(builder, shape[0] * shape[3])
    Array3Meta.AddBitrange(builder, bitrange)
    Array3Meta.AddCmap(builder, cmap)
    if vmin is not None:
        Array3Meta.AddVmin(builder, vmin)
    if vmax is not None:
        Array3Meta.AddVmax(builder, vmax)
    if opacity is not None:
        Array3Meta.AddOpacity(builder, opacity)
    Array3Meta.AddName(builder, name_fb)
    if parent_fb:
        Array3Meta.AddParentName(builder, parent_fb)
    Array3Meta.AddDuration(builder, duration)
    Array3Meta.AddFps(builder, fps)
    Array3Meta.AddDate(builder, date_fb)
    Array3Meta.AddComment(builder, comment_fb)
    if metadata:
        Array3Meta.AddMetadata(builder, metadata)
    Array3Meta.AddNc(builder, shape[3])
    d = Array3Meta.End(builder)

    root = build_root(builder, Data.Array3Meta, d)
    builder.FinishSizePrefixed(root)
    buf = builder.Output()
    return buf


def create_array3metaflow_msg(shape, parent_name=None, name="", color=None):
    if parent_name is None:
        parent_name = ""
    builder = flatbuffers.Builder(1024)
    name_fb = builder.CreateString(name)
    parent_fb = builder.CreateString(parent_name)
    Array3MetaFlow.Start(builder)
    Array3MetaFlow.AddNx(builder, shape[2])
    Array3MetaFlow.AddNy(builder, shape[1])
    Array3MetaFlow.AddNt(builder, shape[0])
    Array3MetaFlow.AddName(builder, name_fb)
    Array3MetaFlow.AddParentName(builder, parent_fb)
    if color:
        Array3MetaFlow.AddColor(builder, get_color(builder, color))
    d = Array3MetaFlow.End(builder)

    root = build_root(builder, Data.Array3MetaFlow, d)
    builder.FinishSizePrefixed(root)
    buf = builder.Output()
    return buf


def create_array3dataf_msg(array, idx=0):
    builder = flatbuffers.Builder(65536)
    data = builder.CreateNumpyVector(array)
    Array3DataChunkf.Start(builder)
    Array3DataChunkf.AddStartidx(builder, idx)
    Array3DataChunkf.AddData(builder, data)
    d = Array3DataChunkf.End(builder)

    root = build_root(builder, Data.Array3DataChunkf, d)
    builder.FinishSizePrefixed(root)
    buf = builder.Output()
    return buf


def create_array3datau8_msg(array, idx=0):
    builder = flatbuffers.Builder(65536)
    data = builder.CreateNumpyVector(array)
    Array3DataChunku8.Start(builder)
    Array3DataChunku8.AddStartidx(builder, idx)
    Array3DataChunku8.AddData(builder, data)
    d = Array3DataChunku8.End(builder)

    root = build_root(builder, Data.Array3DataChunku8, d)
    builder.FinishSizePrefixed(root)
    buf = builder.Output()
    return buf


def create_array3datau16_msg(array, idx=0):
    builder = flatbuffers.Builder(65536)
    data = builder.CreateNumpyVector(array)
    Array3DataChunku16.Start(builder)
    Array3DataChunku16.AddStartidx(builder, idx)
    Array3DataChunku16.AddData(builder, data)
    d = Array3DataChunku16.End(builder)

    root = build_root(builder, Data.Array3DataChunku16, d)
    builder.FinishSizePrefixed(root)
    buf = builder.Output()
    return buf


[docs] def show_file(filepath: Union[Text, Path]): """ Load a file in Monochrome. Parameters ---------- filepath : str or Path Path to the file to be loaded """ show_files([filepath])
[docs] def show_files(paths: List[Union[Text, Path]]): """ Load multiple files in Monochrome. Parameters ---------- paths : List[str or Path] List of paths to the files to be loaded """ paths = [Path(path) for path in paths] if not all([path.exists() for path in paths]): raise FileNotFoundError(f"One of more files of {paths} do not exist") paths = [str(path.absolute()) for path in paths] s = create_socket() buf = create_filepaths_msg(paths) s.sendall(buf)
[docs] def show_points(points, name: Text = "", parent: Optional[Text] = None, color=None, point_size: Optional[float] = None): """ Show a list of points for each frame in Monochrome. Parameters ---------- points : List[List[Tuple[float, float]]] A list of list of points (x, y). The outer list elements are the frames, the inner list is the list of points for a specific frame. name : str Optional description parent : str Name of the video onto which the points will be displayed. If none is given the last loaded video will be used. color : str or tuple Matplotlib color (either string like 'black' or rgba tuple) point_size : float Size of points in image pixels """ name = str(name) s = create_socket() buf = create_pointsvideo_msg(points, name, parent, color, point_size) s.sendall(buf)
[docs] def show_image(array: np.ndarray, name: Text = "", cmap: Union[ColorMap, Text] = ColorMap.DEFAULT, vmin: Optional[float] = None, vmax: Optional[float] = None, bitrange: Union[BitRange, Text] = BitRange.AUTODETECT, parent: Optional[Text] = None, opacity: Optional[OpacityFunction] = None, comment: Text = "", metadata: Optional[Dict] = None): """ Show an image in Monochrome. Alias for :func:`show_video`. """ return show_video(array, name=name, cmap=cmap, vmin=vmin, vmax=vmax, bitrange=bitrange, parent=parent, opacity=opacity, comment=comment, metadata=metadata)
[docs] def show_video(array: np.ndarray, name: Text = "", cmap: Union[ColorMap, Text] = ColorMap.DEFAULT, vmin: Optional[float] = None, vmax: Optional[float] = None, bitrange: Union[BitRange, Text] = BitRange.AUTODETECT, parent: Optional[Text] = None, opacity: Optional[OpacityFunction] = None, comment: Text = "", metadata: Optional[Dict] = None): """Play a video or open a image in Monochrome. Arrays of dtype np.float, np.uint8, and np.uint16 are natively supported by Monochrome. Arrays with other dtypes will be converted to np.float32. Parameters ---------- array : np.ndarray The video to be displayed. The array should have the shape (T, H, W) or (H, W). name : str Name of the video cmap : str or ColorMap Colormap for the video. One of 'default' (autodetect), 'gray', 'hsv', 'blackbody', 'viridis', 'PRGn', 'PRGn_pos', 'PRGn_neg', 'RdBu', 'tab10'. vmin : float Minimum value for the colormap. Default is None. vmax : float Maximum value for the colormap. Default is None. bitrange : str or BitRange Valuerange for the video. One of 'autodetect', 'MinMax' 'uint8', 'uint10', 'uint12', 'uint16', 'float' (for [0,1]), 'diff' (for [-1, 1]), 'phase' (for [0, 2*pi]), or 'phase_diff (for [-pi, pi])'. Default is 'autodetect'. parent : str Name of the parent video opacity : OpacityFunction Opacity function for alpha blending if video is a layer. One of 'linear', 'linear_r', 'centered', 1.0, 0.75, 0.5, 0.25, or 0.0. Default is `opacity=1.0`. comment : str Comment to be displayed metadata : dict Additional metadata to be displayed """ array = np.squeeze(array) if array.ndim == 2: # assume that it is a 2D image array = array[np.newaxis, :, :, np.newaxis] elif array.ndim == 3: if array.shape[2] in (3, 4): # assume that it is an RGB(A) image if array.shape[2] == 4: # alpha channel is not yet supported array = array[..., :3] array = np.expand_dims(array, 0) else: array = np.expand_dims(array, -1) elif array.ndim == 4: if array.shape[3] not in (3, 4): msg = f"Video has an unsupported shape {array.shape}. Four dimensional arrays are only supported if the last dimension is 3 (RGB) or 4 (RGBA)." raise ValueError(msg) else: msg = f"Array does not have an image/video shape: Shape {array.shape}" raise ValueError(msg) if array.dtype == np.float32: dtype = ArrayDataType.FLOAT elif array.dtype == np.uint8: dtype = ArrayDataType.UINT8 elif array.dtype == np.uint16: dtype = ArrayDataType.UINT16 else: if np.iscomplexobj(array): raise ValueError("Complex arrays not supported") else: array = array.astype(np.float32) dtype = ArrayDataType.FLOAT name = str(name) if isinstance(cmap, str): cmap = getattr(ColorMap, cmap.upper()) if isinstance(bitrange, str): bitrange = getattr(BitRange, bitrange.upper()) if opacity is not None: if isinstance(opacity, str): if opacity.upper() in ("LINEAR", "LINEAR_R", "CENTERED"): opacity = getattr(OpacityFunction, opacity.upper()) else: try: opacity = float(opacity) except ValueError: raise ValueError(f"Invalid opacity value: '{opacity}'. Supported values are 'linear', 'linear_r', 'centered', or a float between 0.0 and 1.0.") if isinstance(opacity, (int, float)): if not (0.0 <= opacity <= 1.0): raise ValueError(f"Opacity ({opacity}) must be between 0.0 and 1.0.") # Find the closest supported float value supported_floats = [0.0, 0.25, 0.5, 0.75, 1.0] closest_float = min(supported_floats, key=lambda x: abs(x - opacity)) mapping = { 1.0: OpacityFunction.FIXED_100, 0.75: OpacityFunction.FIXED_75, 0.5: OpacityFunction.FIXED_50, 0.25: OpacityFunction.FIXED_25, 0.0: OpacityFunction.FIXED_0, } opacity = mapping[closest_float] s = create_socket() buf = create_array3meta_msg(dtype, name, array.shape, comment=comment, bitrange=bitrange, cmap=cmap, parent_name=parent, opacity=opacity, metadata=metadata, vmin=vmin, vmax=vmax) s.sendall(buf) flat = array.flatten() length = flat.size max_size = MAX_BUFFER_SIZE for idx in range(0, length, max_size): end = length if idx + max_size > length else idx + max_size if array.dtype == np.float32: buf = create_array3dataf_msg(flat[idx:end], idx) elif array.dtype == np.uint8: buf = create_array3datau8_msg(flat[idx:end], idx) elif array.dtype == np.uint16: buf = create_array3datau16_msg(flat[idx:end], idx) else: raise NotImplementedError("Unkown dtype") s.sendall(buf)
[docs] def show_layer(array: np.ndarray, name: Text = "", parent: Optional[Text] = None, opacity: Optional[OpacityFunction] = None, **kwargs): """ Add a layer to the parent video in Monochrome. Parameters ---------- array : np.ndarray The layer to be displayed. The array should have the shape (T, H, W) or (H, W). name : str Name of the layer parent : str Name of the parent video, if None the last loaded video will be used opacity : OpacityFunction Opacity function for alpha blending. One of 'linear', 'linear_r', 'centered', 1.0, 0.75, 0.5, 0.25, or 0.0. Default is `opacity=1.0`. kwargs : dict Additional arguments to be passed to :func:`show_video` """ if parent is None: parent = "" show_video(array, name=name, parent=parent, opacity=opacity, **kwargs)
[docs] def show_flow(flow_uv: np.ndarray, name: Text = "", parent: Optional[Text] = None, color=None): """ Visualize optical flow in Monochrome. Parameters ---------- flow_uv : np.ndarray Optical flow field of shape (T, H, W, 2) name : str Name of the flow parent : str Name of the parent video, if None the last loaded video will be used color : str or tuple Matplotlib color (either string like 'black' or rgba tuple) """ if flow_uv.ndim != 4: raise ValueError("array is not four-dimensional") if flow_uv.dtype != np.float32: raise ValueError("array is not floating type") if flow_uv.shape[3] != 2: raise ValueError("flow should be of shape [T, H, W, 2]") name = str(name) s = create_socket() shape = (flow_uv.shape[0] * 2, flow_uv.shape[1], flow_uv.shape[2]) buf = create_array3metaflow_msg(shape, parent, name, color) s.sendall(buf) flat = flow_uv.flatten() length = flat.size max_size = MAX_BUFFER_SIZE for idx in range(0, length, max_size): end = length if idx + max_size > length else idx + max_size buf = create_array3dataf_msg(flat[idx:end], idx) s.sendall(buf)
[docs] def show(array_or_path: Union[str, Path, np.ndarray], *args, **kwargs): """Autodetect the type of the input and show it in Monochrome.""" if isinstance(array_or_path, np.ndarray): if array_or_path.ndim == 4 and array_or_path.shape[3] == 2: return show_flow(array_or_path, *args, **kwargs) else: return show_video(array_or_path, *args, **kwargs) elif isinstance(array_or_path, str) or isinstance(array_or_path, Path): return show_file(array_or_path) else: raise ValueError("array_or_path has to be numpy array or string")
[docs] def export_video(filepath, name="", fps=30, t_start=0, t_end=-1, description="", close_after_completion=False): """Export a video displayed in Monochrome to a .mp4 file. .. note:: Monochrome exports the video as rendered in the window, i.e. the video will have the same resolution as the video window and all the layers/points/... will be merged into a single video. Parameters ---------- filepath : str Path to the output .mp4 file name : str Name of the video to be exported fps : int Frames per second of the output video t_start : int Start frame of the output video t_end : int End frame of the output video, -1 for the last frame description : str Description of the video to embed in the .mp4 file close_after_completion : bool Close the video in Monochrome after the export is completed """ s = create_socket() builder = flatbuffers.Builder(512) name_fb = builder.CreateString(name) filepath_fb = builder.CreateString(str(Path(filepath).absolute())) description_fb = builder.CreateString(description) VideoExport.Start(builder) VideoExport.AddRecording(builder, name_fb) VideoExport.AddFilepath(builder, filepath_fb) VideoExport.AddDescription(builder, description_fb) VideoExport.AddFormat(builder, VideoExportFormat.FFMPEG) VideoExport.AddFps(builder, fps) VideoExport.AddTStart(builder, t_start) VideoExport.AddTEnd(builder, t_end) VideoExport.AddCloseAfterCompletion(builder, close_after_completion) fp = VideoExport.End(builder) root = build_root(builder, Data.VideoExport, fp) builder.FinishSizePrefixed(root) buf = builder.Output() s.sendall(buf)
[docs] def close_video(name=""): """Close a video in Monochrome. Parameters ---------- name : str Name of the video to be closed. If empty, the last loaded video will be closed. """ s = create_socket() builder = flatbuffers.Builder(512) name_fb = builder.CreateString(name) CloseVideo.Start(builder) CloseVideo.AddName(builder, name_fb) fp = CloseVideo.End(builder) root = build_root(builder, Data.CloseVideo, fp) builder.FinishSizePrefixed(root) buf = builder.Output() s.sendall(buf)
[docs] def set_playback_speed(speed: float): """Set the playback speed for all videos in Monochrome. Parameters ---------- speed : float Playback speed. 1.0 is normal speed, 0.0 is paused, 2.0 is double speed, etc. """ s = create_socket() builder = flatbuffers.Builder(512) SetPlaybackSpeed.Start(builder) SetPlaybackSpeed.AddSpeed(builder, float(speed)) fp = SetPlaybackSpeed.End(builder) root = build_root(builder, Data.SetPlaybackSpeed, fp) builder.FinishSizePrefixed(root) buf = builder.Output() s.sendall(buf)
[docs] def close_all_videos(): """Close all videos currently open in Monochrome.""" s = create_socket() builder = flatbuffers.Builder(512) CloseAllVideos.Start(builder) fp = CloseAllVideos.End(builder) root = build_root(builder, Data.CloseAllVideos, fp) builder.FinishSizePrefixed(root) buf = builder.Output() s.sendall(buf)
[docs] def quit(): # noqa: A001 """Quit Monochrome, terminating the process.""" s = create_socket() builder = flatbuffers.Builder(512) root = build_root(builder, Data.Quit, 0) builder.FinishSizePrefixed(root) buf = builder.Output() s.sendall(buf)