from __future__ import print_function import sys try: import poser except ImportError: poser = SCENE = None raise RuntimeWarning("Script should run in Poser-Python.") else: SCENE = poser.Scene() import os import socket import errno import threading import time from collections import deque, OrderedDict # You may want do delete the following block. # ------------------------------------------- THISPATH = os.path.abspath(os.path.dirname(sys.argv[0])) if THISPATH not in sys.path: sys.path.append(THISPATH) init = os.path.join(THISPATH, "__init__.py") if not os.path.isfile(init): # make sure we can import from this path. with open(init, "wb"): print("\n") # ------------------------------------------- try: from path2poserobj import object_from_path from PoserObject2Json import poserobject2json except ImportError as err: print("\nPlease download from http://adp.spdns.org\n") raise err __all__ = ["HOST", "PORT", "MAX_INBUF_SIZE", "SCENE", "DEL_CLIENT_QUEUES", "on_error", "stopflag", "frompath", "obj2json", "Dispatcher"] frompath = object_from_path obj2json = poserobject2json HOST = "192.168.1.215" PORT = 55555 MAX_INBUF_SIZE = 1024 * 8 MAX_QUEUE_SIZE = 100 DEL_CLIENT_QUEUES = False # Delete data in queues if client disapears Dispatcher = None on_error = None stopflag = threading.Event() _queues_in = OrderedDict() _queues_out = OrderedDict() _tcpsocket = None _clients = dict() _threadlist = deque() def get_queue_in(addr): return _queues_in.setdefault(addr, deque()) def get_queue_out(addr): return _queues_out.setdefault(addr, deque()) def get_queues(addr): return get_queue_in(addr), get_queue_out(addr) def get_clientdata(addr): return _clients.setdefault(addr, ClientData()) def get_client_all(addr): return get_clientdata(addr), get_queue_in(addr), get_queue_out(addr) def out_queue_empty(addr): return get_queue_out(addr).__len__() == 0 def in_queue_empty(addr): return get_queue_in(addr).__len__() == 0 def threadlist_empty(): return _threadlist.__len__() == 0 def get_new_client_thread(addr, *args, **kwargs): for idx in range(_threadlist.__len__() - 1, 0, -1): # Going from top to 0 makes it easier to # remove items. t = _threadlist[idx] if isinstance(t, ClientThread) and t.addr == addr: if not t.stopped.is_set(): t.stopped.set() t.stopped.wait(1) if t.is_alive(): t.join(1) del _threadlist[idx] return ClientThread(addr, *args, **kwargs) def _clear_threadlist(): for i in range(_threadlist.__len__()): if isinstance(_threadlist[i], threading.Thread) \ and hasattr(_threadlist[i], "stop"): _threadlist[i].stop() while _threadlist.__len__(): t = _threadlist.pop() try: if isinstance(t, threading.Thread): t.join(3) if t.is_alive(): raise UserWarning("Thread wan't stop: %s" % t) except Exception: pass def cleanup(del_queues=DEL_CLIENT_QUEUES): _clear_threadlist() if del_queues: for addr, queue in _queues_in.items(): while queue.__len__() > 0: q = queue.pop() print(q) for addr, queue in _queues_out.items(): while queue.__len__() > 0: queue.pop() def _setSocket(sock_family=socket.AF_UNSPEC, sock_type=socket.SOCK_STREAM): # setup TCP global _tcpsocket, HOST, PORT if sock_family == 6: sock_family = socket.AF_INET6 elif sock_family == 4: sock_family = socket.AF_INET if HOST is None: HOST = socket.getaddrinfo(None, 0, sock_family)[-1][-1][0] for res in socket.getaddrinfo(HOST, PORT, sock_family, sock_type, 0, socket.AI_PASSIVE): af, socktype, proto, canonname, sa = res try: _tcpsocket = socket.socket(af, socktype, proto) except socket.error as e: _tcpsocket = None continue else: _tcpsocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: _tcpsocket.bind(sa) _tcpsocket.listen(1) except socket.error: _tcpsocket.close() _tcpsocket = None continue break return _tcpsocket # ------------------------------------------------------------------- # Classes # ------------------------------------------------------------------- class ClientData(object): NUMFLAGS = 3 __slots__ = "socket", "globals", "locals", \ "_flags", "_errors", "_busy", "_lastseen" def __init__(self): for kw in self.__slots__: setattr(self, kw, None) self.reset() def reset(self): self._lastseen = [0, 0] self._flags = [threading.Event()] * self.NUMFLAGS self._busy = threading.Event() self._errors = deque(maxlen=20) self.globals = dict(poser=poser, SCENE=SCENE, frompath=object_from_path, obj2json=poserobject2json ) self.locals = {} def add_error(self, e): self._errors.append((time.asctime(), e)) def is_busy(self): return self._busy.is_set() def set_busy(self, v): if bool(v): self._busy.set() else: self._busy.clear() def flag(self, nr): return self._flags[min(self.NUMFLAGS, max(0, nr))].is_set() flag_is_set = flag def flags(self): return [f.is_set() for f in self._flags] def any_flag(self): return any(self.flags()) any_flag_is_set = any_flag def set_flag(self, nr, v): nr = max(0, min(nr, self.NUMFLAGS)) if bool(v): self._flags[nr].set() else: self._flags[nr].clear() @property def num_errors(self): return len(self._errors) @property def last_error(self): return self._errors[-1] @property def lastseen_in(self): return self._lastseen[0] @lastseen_in.setter def lastseen_in(self, v): if v > self._lastseen[0]: self._lastseen[0] = v @property def lastseen_out(self): return self._lastseen[1] @lastseen_out.setter def lastseen_out(self, v): if v > self._lastseen[1]: self._lastseen[1] = v @property def lastseen(self): return max(self._lastseen[0], self._lastseen[1]) class BaseThread(threading.Thread): def __init__(self, *args, **kwargs): self.stopped = threading.Event() super(BaseThread, self).__init__(*args, **kwargs) _threadlist.append(self) def _write(self, *args): if hasattr(self, "connection") \ and isinstance(self.connection, socket.socket): for arg in args: if self.stopped.is_set(): break self.connection.settimeout(1) try: self.last_connection.sendall(str(arg)) except socket.timeout: continue except socket.error: return def stop(self): self.stopped.set() def __del__(self): try: _threadlist.remove(self) except ValueError: pass class ClientThread(BaseThread): FLAGNR_QUEUEFULL = 0 # for creating web apps, we need to be able to close # connections after transfering each block. FLAGNR_KEEPALIVE = 1 def __init__(self, addr, keepalive=True): print("CLIENT STARTED", addr) super(ClientThread, self).__init__() self.name = "Client_%s" % str(addr).replace(".", "_") self.connection = None self.addr = addr self.queue_in = get_queue_in(addr) self.queue_out = get_queue_out(addr) self.clientdata = get_clientdata(addr) self.clientdata.set_flag(self.FLAGNR_KEEPALIVE, keepalive) def __del__(self): if DEL_CLIENT_QUEUES and self.addr: _queues_out.pop(self.addr, None) _queues_in.pop(self.addr, None) _clients.pop(self.addr, None) if self in _threadlist: _threadlist.remove(self) def run(self): assert isinstance(self.connection, socket.socket) q_in = self.queue_in q_out = self.queue_out connection = self.connection cdata = self.clientdata timeout_counter = 0 try: # catch any error -- even those fired while # the programmer made a mistake :) while timeout_counter < 20 \ and not self.stopped.is_set(): cdata.set_flag(self.FLAGNR_QUEUEFULL, q_in.__len__() >= MAX_QUEUE_SIZE) datablock = None if not cdata.flag_is_set(self.FLAGNR_QUEUEFULL): connection.settimeout(.1) try: datablock = connection.recv(MAX_INBUF_SIZE) except socket.timeout: timeout_counter += 1 if q_out.__len__() == 0: time.sleep(.3) except socket.error as e: # close connection on any error but timeout. cdata.add_error(e) self.stopped.set() break if datablock: # we got something timeout_counter = 0 q_in.append(datablock) cdata.lastseen_in = time.time() # As long as we receive data we do not send anything # because we want flood the client. continue # No incomming data at this point. # Lets see if we have something to send. connection.settimeout(2) while q_out.__len__() > 0: try: connection.sendall(q_out[0]) except socket.timeout: timeout_counter += 1 # Before we stick to long to this, # we better leave and try later. break # stop sending except socket.error as e: # unrecoverable error cdata.add_error(e) self.stopped.set() break else: # successfully sent! q_out.popleft() cdata.lastseen_out = time.time() timeout_counter = 0 if not cdata.flag_is_set(self.FLAGNR_KEEPALIVE): # seems to be http transfer self.stopped.set() # forced stop print("Client", self.addr, "closed.") try: if not cdata.flag_is_set(self.FLAGNR_KEEPALIVE): # Using shutdown allows the client to reconnect # with just the same conditions as before. self.connection.shutdown(socket.SHUT_RDWR) except Exception: pass self.connection = None _threadlist.remove(self) # global error-catcher except Exception as err: # whatever the reason for this error: # try to send it to the connection. self._write("UNECPECTED ERROR:", err) class PoserInterpreterThread(BaseThread): """ Thread interpreting incomming data. """ def __init__(self): super(PoserInterpreterThread, self).__init__() self.name = "PoserInterpreter" def run(self): while not self.stopped.is_set(): if _queues_in.__len__() == 0: time.sleep(.3) continue result = None for addr, queue in _queues_in.items(): clientdata = get_clientdata(addr) assert isinstance(queue, deque) while queue.__len__(): # compute incoming block = str(queue.popleft()) try: if block.startswith("="): result = eval(block[1:], clientdata.globals, clientdata.locals) elif block.startswith("%"): result = object_from_path(block[1:]) else: if "\n" in block: block = compile(block) exec (block, clientdata.globals, clientdata.locals) except Exception as e: _queues_out[addr].append(str(e)) # send back result if any if result is not None: _queues_out[addr].append(str(result)) class DispatcherThread(BaseThread): """ Main thread generating all other threads. """ def __init__(self): super(DispatcherThread, self).__init__() self.name = "Dispatcher" self.sys_stdout_save = None self.last_connection = None def run(self): print(self.name, "STARTED") assert isinstance(_tcpsocket, socket.socket) interpreter = PoserInterpreterThread() is_http = PORT in (80, 443, 8080) if is_http: self.sys_stdout_save = sys.stdout while not stopflag.is_set() and not self.stopped.is_set(): _tcpsocket.settimeout(1) try: connection, addr = _tcpsocket.accept() except socket.timeout: continue # timout is fine here. except socket.error as e: print("SOCKET ERROR in Dispatcher") break addr = addr[0] # we need the ip address keepalive = PORT not in (80, 443, 8080) if _clients.__len__() < 10: newclient = get_new_client_thread(addr, keepalive) newclient.connection = connection newclient.start() if not interpreter.is_alive(): interpreter.start() else: connection.settimeout(.2) try: connection.sendall("To mutch clients. Try later.\n") except socket.error: pass finally: connection.close() print("Too mutch clients.") cleanup() _tcpsocket.shutdown(socket.SHUT_RDWR) print(self.name, "STOPPED") # ------------------------------------------------------------------- # Start the show... # ------------------------------------------------------------------- def main(): global Dispatcher # Maybe there are fragments left from the last run # if the script stopped with an error and is re-started. cleanup() if Dispatcher.is_alive(): if not Dispatcher.stopped.is_set(): Dispatcher.stopped.set() Dispatcher.stopped.wait(timeout=10) if Dispatcher.is_alive(): raise RuntimeWarning("Can't stop previously started Dispatcher") if isinstance(_tcpsocket, socket.socket): try: _tcpsocket.shutdown(socket.SHUT_RDWR) except socket.error: pass if _setSocket() is None: raise IOError("Can't create a socket for host %s, port %s." % (HOST, PORT)) Dispatcher = DispatcherThread() Dispatcher.start() if __name__ == "__main__": try: main() except Exception as err: print("Aborted with", err)