PoserTalkr.py
from __future__ import print_function

import sys

try:
    import poser
except ImportError:
    poser = SCENE = None
    raise RuntimeWarning("Script should run in Poser-Python.")
else:
    SCENE = poser.Scene()

import os
import socket
import errno
import threading
import time
from collections import deque, OrderedDict

# You may want do delete the following block.
# -------------------------------------------
THISPATH = os.path.abspath(os.path.dirname(sys.argv[0]))
if THISPATH not in sys.path:
    sys.path.append(THISPATH)
init = os.path.join(THISPATH, "__init__.py")
if not os.path.isfile(init):
    # make sure we can import from this path.
    with open(init, "wb"):
        print("\n")
# -------------------------------------------

try:
    from path2poserobj import object_from_path
    from PoserObject2Json import poserobject2json
except ImportError as err:
    print("\nPlease download from http://adp.spdns.org\n")
    raise err

__all__ = ["HOST", "PORT", "MAX_INBUF_SIZE", "SCENE", "DEL_CLIENT_QUEUES",
           "on_error", "stopflag", "frompath", "obj2json",
           "Dispatcher"]

frompath = object_from_path
obj2json = poserobject2json

HOST = "192.168.1.215"
PORT = 55555
MAX_INBUF_SIZE = 1024 * 8
MAX_QUEUE_SIZE = 100
DEL_CLIENT_QUEUES = False  # Delete data in queues if client disapears
Dispatcher = None
on_error = None
stopflag = threading.Event()

_queues_in = OrderedDict()
_queues_out = OrderedDict()
_tcpsocket = None
_clients = dict()
_threadlist = deque()


def get_queue_in(addr):
    return _queues_in.setdefault(addr, deque())


def get_queue_out(addr):
    return _queues_out.setdefault(addr, deque())


def get_queues(addr):
    return get_queue_in(addr), get_queue_out(addr)


def get_clientdata(addr):
    return _clients.setdefault(addr, ClientData())


def get_client_all(addr):
    return get_clientdata(addr), get_queue_in(addr), get_queue_out(addr)


def out_queue_empty(addr):
    return get_queue_out(addr).__len__() == 0


def in_queue_empty(addr):
    return get_queue_in(addr).__len__() == 0


def threadlist_empty():
    return _threadlist.__len__() == 0


def get_new_client_thread(addr, *args, **kwargs):
    for idx in range(_threadlist.__len__() - 1, 0, -1):
        # Going from top to 0 makes it easier to
        # remove items.
        t = _threadlist[idx]
        if isinstance(t, ClientThread) and t.addr == addr:
            if not t.stopped.is_set():
                t.stopped.set()
                t.stopped.wait(1)
            if t.is_alive():
                t.join(1)
            del _threadlist[idx]

    return ClientThread(addr, *args, **kwargs)


def _clear_threadlist():
    for i in range(_threadlist.__len__()):
        if isinstance(_threadlist[i], threading.Thread) \
                and hasattr(_threadlist[i], "stop"):
            _threadlist[i].stop()

    while _threadlist.__len__():
        t = _threadlist.pop()
        try:
            if isinstance(t, threading.Thread):
                t.join(3)
                if t.is_alive():
                    raise UserWarning("Thread wan't stop: %s" % t)
        except Exception:
            pass


def cleanup(del_queues=DEL_CLIENT_QUEUES):
    _clear_threadlist()
    if del_queues:
        for addr, queue in _queues_in.items():
            while queue.__len__() > 0:
                q = queue.pop()
                print(q)

        for addr, queue in _queues_out.items():
            while queue.__len__() > 0:
                queue.pop()


def _setSocket(sock_family=socket.AF_UNSPEC, sock_type=socket.SOCK_STREAM):
    # setup TCP
    global _tcpsocket, HOST, PORT

    if sock_family == 6:
        sock_family = socket.AF_INET6
    elif sock_family == 4:
        sock_family = socket.AF_INET

    if HOST is None:
        HOST = socket.getaddrinfo(None, 0, sock_family)[-1][-1][0]

    for res in socket.getaddrinfo(HOST, PORT, sock_family,
                                  sock_type, 0, socket.AI_PASSIVE):
        af, socktype, proto, canonname, sa = res
        try:
            _tcpsocket = socket.socket(af, socktype, proto)
        except socket.error as e:
            _tcpsocket = None
            continue
        else:
            _tcpsocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

        try:
            _tcpsocket.bind(sa)
            _tcpsocket.listen(1)
        except socket.error:
            _tcpsocket.close()
            _tcpsocket = None
            continue
        break

    return _tcpsocket


# -------------------------------------------------------------------
# Classes
# -------------------------------------------------------------------


class ClientData(object):
    NUMFLAGS = 3
    __slots__ = "socket", "globals", "locals", \
                "_flags", "_errors", "_busy", "_lastseen"

    def __init__(self):
        for kw in self.__slots__:
            setattr(self, kw, None)
        self.reset()

    def reset(self):
        self._lastseen = [0, 0]
        self._flags = [threading.Event()] * self.NUMFLAGS
        self._busy = threading.Event()
        self._errors = deque(maxlen=20)
        self.globals = dict(poser=poser, SCENE=SCENE,
                            frompath=object_from_path,
                            obj2json=poserobject2json
                            )
        self.locals = {}

    def add_error(self, e):
        self._errors.append((time.asctime(), e))

    def is_busy(self):
        return self._busy.is_set()

    def set_busy(self, v):
        if bool(v):
            self._busy.set()
        else:
            self._busy.clear()

    def flag(self, nr):
        return self._flags[min(self.NUMFLAGS, max(0, nr))].is_set()

    flag_is_set = flag

    def flags(self):
        return [f.is_set() for f in self._flags]

    def any_flag(self):
        return any(self.flags())

    any_flag_is_set = any_flag

    def set_flag(self, nr, v):
        nr = max(0, min(nr, self.NUMFLAGS))
        if bool(v):
            self._flags[nr].set()
        else:
            self._flags[nr].clear()

    @property
    def num_errors(self):
        return len(self._errors)

    @property
    def last_error(self):
        return self._errors[-1]

    @property
    def lastseen_in(self):
        return self._lastseen[0]

    @lastseen_in.setter
    def lastseen_in(self, v):
        if v > self._lastseen[0]:
            self._lastseen[0] = v

    @property
    def lastseen_out(self):
        return self._lastseen[1]

    @lastseen_out.setter
    def lastseen_out(self, v):
        if v > self._lastseen[1]:
            self._lastseen[1] = v

    @property
    def lastseen(self):
        return max(self._lastseen[0], self._lastseen[1])


class BaseThread(threading.Thread):
    def __init__(self, *args, **kwargs):
        self.stopped = threading.Event()
        super(BaseThread, self).__init__(*args, **kwargs)
        _threadlist.append(self)

    def _write(self, *args):
        if hasattr(self, "connection") \
                and isinstance(self.connection, socket.socket):
            for arg in args:
                if self.stopped.is_set():
                    break
                self.connection.settimeout(1)
                try:
                    self.last_connection.sendall(str(arg))
                except socket.timeout:
                    continue
                except socket.error:
                    return

    def stop(self):
        self.stopped.set()

    def __del__(self):
        try:
            _threadlist.remove(self)
        except ValueError:
            pass


class ClientThread(BaseThread):
    FLAGNR_QUEUEFULL = 0
    # for creating web apps, we need to be able to close
    # connections after transfering each block.
    FLAGNR_KEEPALIVE = 1

    def __init__(self, addr, keepalive=True):
        print("CLIENT STARTED", addr)
        super(ClientThread, self).__init__()
        self.name = "Client_%s" % str(addr).replace(".", "_")
        self.connection = None
        self.addr = addr
        self.queue_in = get_queue_in(addr)
        self.queue_out = get_queue_out(addr)
        self.clientdata = get_clientdata(addr)
        self.clientdata.set_flag(self.FLAGNR_KEEPALIVE, keepalive)

    def __del__(self):
        if DEL_CLIENT_QUEUES and self.addr:
            _queues_out.pop(self.addr, None)
            _queues_in.pop(self.addr, None)
            _clients.pop(self.addr, None)

        if self in _threadlist:
            _threadlist.remove(self)

    def run(self):
        assert isinstance(self.connection, socket.socket)
        q_in = self.queue_in
        q_out = self.queue_out
        connection = self.connection
        cdata = self.clientdata
        timeout_counter = 0

        try:
            # catch any error -- even those fired while
            # the programmer made a mistake :)
            while timeout_counter < 20 \
                    and not self.stopped.is_set():
                cdata.set_flag(self.FLAGNR_QUEUEFULL,
                               q_in.__len__() >= MAX_QUEUE_SIZE)
                datablock = None

                if not cdata.flag_is_set(self.FLAGNR_QUEUEFULL):
                    connection.settimeout(.1)
                    try:
                        datablock = connection.recv(MAX_INBUF_SIZE)
                    except socket.timeout:
                        timeout_counter += 1
                        if q_out.__len__() == 0:
                            time.sleep(.3)
                    except socket.error as e:
                        # close connection on any error but timeout.
                        cdata.add_error(e)
                        self.stopped.set()
                        break

                if datablock:
                    # we got something
                    timeout_counter = 0
                    q_in.append(datablock)
                    cdata.lastseen_in = time.time()
                    # As long as we receive data we do not send anything
                    # because we want flood the client.
                    continue

                # No incomming data at this point.
                # Lets see if we have something to send.
                connection.settimeout(2)
                while q_out.__len__() > 0:
                    try:
                        connection.sendall(q_out[0])
                    except socket.timeout:
                        timeout_counter += 1
                        # Before we stick to long to this,
                        # we better leave and try later.
                        break  # stop sending
                    except socket.error as e:
                        # unrecoverable error
                        cdata.add_error(e)
                        self.stopped.set()
                        break
                    else:
                        # successfully sent!
                        q_out.popleft()
                        cdata.lastseen_out = time.time()
                        timeout_counter = 0
                        if not cdata.flag_is_set(self.FLAGNR_KEEPALIVE):
                            # seems to be http transfer
                            self.stopped.set()

            # forced stop
            print("Client", self.addr, "closed.")
            try:
                if not cdata.flag_is_set(self.FLAGNR_KEEPALIVE):
                    # Using shutdown allows the client to reconnect
                    # with just the same conditions as before.
                    self.connection.shutdown(socket.SHUT_RDWR)
            except Exception:
                pass
            self.connection = None
            _threadlist.remove(self)

        # global error-catcher
        except Exception as err:
            # whatever the reason for this error:
            # try to send it to the connection.
            self._write("UNECPECTED ERROR:", err)


class PoserInterpreterThread(BaseThread):
    """ 
    Thread interpreting incomming data. 
    """

    def __init__(self):
        super(PoserInterpreterThread, self).__init__()
        self.name = "PoserInterpreter"

    def run(self):
        while not self.stopped.is_set():
            if _queues_in.__len__() == 0:
                time.sleep(.3)
                continue

            result = None

            for addr, queue in _queues_in.items():
                clientdata = get_clientdata(addr)
                assert isinstance(queue, deque)
                while queue.__len__():
                    # compute incoming
                    block = str(queue.popleft())
                    try:
                        if block.startswith("="):
                            result = eval(block[1:], clientdata.globals, clientdata.locals)
                        elif block.startswith("%"):
                            result = object_from_path(block[1:])
                        else:
                            if "\n" in block:
                                block = compile(block)
                            exec (block, clientdata.globals, clientdata.locals)
                    except Exception as e:
                        _queues_out[addr].append(str(e))

                    # send back result if any
                    if result is not None:
                        _queues_out[addr].append(str(result))


class DispatcherThread(BaseThread):
    """ 
    Main thread generating all other threads. 
    """

    def __init__(self):
        super(DispatcherThread, self).__init__()
        self.name = "Dispatcher"
        self.sys_stdout_save = None
        self.last_connection = None

    def run(self):
        print(self.name, "STARTED")
        assert isinstance(_tcpsocket, socket.socket)
        interpreter = PoserInterpreterThread()
        is_http = PORT in (80, 443, 8080)
        if is_http:
            self.sys_stdout_save = sys.stdout

        while not stopflag.is_set() and not self.stopped.is_set():
            _tcpsocket.settimeout(1)
            try:
                connection, addr = _tcpsocket.accept()
            except socket.timeout:
                continue  # timout is fine here.
            except socket.error as e:
                print("SOCKET ERROR in Dispatcher")
                break

            addr = addr[0]  # we need the ip address
            keepalive = PORT not in (80, 443, 8080)
            if _clients.__len__() < 10:
                newclient = get_new_client_thread(addr, keepalive)
                newclient.connection = connection
                newclient.start()

                if not interpreter.is_alive():
                    interpreter.start()
            else:
                connection.settimeout(.2)
                try:
                    connection.sendall("To mutch clients. Try later.\n")
                except socket.error:
                    pass
                finally:
                    connection.close()
                print("Too mutch clients.")

        cleanup()
        _tcpsocket.shutdown(socket.SHUT_RDWR)
        print(self.name, "STOPPED")


# -------------------------------------------------------------------
# Start the show...
# -------------------------------------------------------------------

def main():
    global Dispatcher
    # Maybe there are fragments left from the last run
    # if the script stopped with an error and is re-started.

    cleanup()

    if Dispatcher.is_alive():
        if not Dispatcher.stopped.is_set():
            Dispatcher.stopped.set()
            Dispatcher.stopped.wait(timeout=10)
        if Dispatcher.is_alive():
            raise RuntimeWarning("Can't stop previously started Dispatcher")

    if isinstance(_tcpsocket, socket.socket):
        try:
            _tcpsocket.shutdown(socket.SHUT_RDWR)
        except socket.error:
            pass

    if _setSocket() is None:
        raise IOError("Can't create a socket for host %s, port %s." % (HOST, PORT))

    Dispatcher = DispatcherThread()
    Dispatcher.start()


if __name__ == "__main__":
    try:
        main()
    except Exception as err:
        print("Aborted with", err)