hostray.util Reference

Worker

class hostray.util.worker.Worker

property:

  • is_func_running -> bool: check if worker is running a function
run_method(func: Callable, *args, on_finish: Callable[[Any], None] = None, on_exception: Callable[[Exception], None] = None, **kwargs) → bool

return True if the given function instance is going to be executed

  • func: function instance to be executed
  • on_finish: callback with the argument of function return after function runned
  • *args: variable number of arguments of method
  • on_exception: callback with the argument of Exception after function Exception occured
  • **kwargs: keyworded, variable-length argument list of method
run_method_and_wait(func: Callable, *args, **kwargs) → Any

execute function and return the function return (thread-blocking)

  • func: function instance to be executed
  • *args: variable number of arguments of method
  • **kwargs: keyworded, variable-length argument list of method
run_method_and_wait_async(func: Callable, *args, **kwargs) → Awaitable

asynchronously execute function and return the function return

  • func: function instance to be executed
  • *args: variable number of arguments of method
  • **kwargs: keyworded, variable-length argument list of method
class hostray.util.worker.FunctionQueueWorker

property:

  • pending_count -> int: return the len of queue
run_method(func: Callable, *args, on_finish: Callable[[Any], None] = None, on_exception: Callable[[Exception], None] = None, **kwargs) → None

queue the function instance to be executed when worker is free

  • func: function instance to be executed
  • on_finish: callback with the argument of function return after function runned
  • *args: variable number of arguments of method
  • on_exception: callback with the argument of Exception after function Exception occured
  • **kwargs: keyworded, variable-length argument list of method
class hostray.util.worker.FunctionLoopWorker
run_method(func: Callable, *args, on_finish: Callable[[Any], None] = None, on_exception: Callable[[Exception], None] = None, **kwargs) → None

start and loop the given function instance

  • func: function instance to be executed
  • on_finish: callback with the argument of function return after function runned for each time
  • *args: variable number of arguments of method
  • on_exception: callback with the argument of Exception after function Exception occured for each time
  • **kwargs: keyworded, variable-length argument list of method
stop()

stop if worker is looping function

class hostray.util.worker.WorkerPool

property:

  • workers() -> List[FunctionQueueWorker]
dispose() → None
info() → Dict
reserve_worker() → str

@contextmanager, yield string of identity to reserved worker instance

run_method(func: Callable, *args, identity: str = None, **kwargs) → Any
  • func: function instance to be executed
  • *args: variable number of arguments of method
  • identity: identity string from reserve_worker
  • **kwargs: keyworded, variable-length argument list of method
broadcast_method(func_name: str, *args, **kwargs) → List[Any]

invoke each worker’s function named func_name if it has.

  • func_name: function name to be invoked
  • *args: variable number of arguments of method
  • **kwargs: keyworded, variable-length argument list of method
class hostray.util.worker.AsyncWorkerPool

inherit from hostray.util.worker.WorkerPool and add asynchronous functions

reserve_worker_async() → str

@asynccontextmanager, yield string of identity to reserved worker instance, hostray implements a unofficial one since Python 3.6 does not have it.

run_method_async(func: Callable, *args, identity: str = None, **kwargs) → Any
  • func: function instance to be executed
  • *args: variable number of arguments of method
  • identity: identity string from reserve_worker
  • **kwargs: keyworded, variable-length argument list of method
broadcast_method_async(func_name: str, *args, **kwargs) → List[Any]

asynchronously invoke each worker’s Awaitable function named func_name if it has.

  • func_name: function name to be invoked
  • *args: variable number of arguments of method
  • **kwargs: keyworded, variable-length argument list of method

Orm

get_declarative_base(key: str = 'default') → DeclarativeMeta

return key managed DeclarativeMeta metaclass

  • key: key to managed DeclarativeMeta metaclass
get_session_maker(db_module: DB_MODULE_NAME, declared_entity_base: DeclarativeMeta, autoflush: bool = False, **kwargs) → Session

return sqlalchemy.orm.Session class type

  • db_module: enum hostray.util.orm.DB_MODULE_NAME
  • declared_entity_base: all orm entity class should inherits from sqlalchemy.ext.declarative.api.DeclarativeMeta before call this function
  • autoflush: enable/disable sqlalchemy.orm.Session autoflush
class hostray.util.orm.EntityBaseAddon

define entity helper functions

property:

  • column_type_validations: Dict[str, Any] = {}

    indicate the column type for validation

  • column_fix: List[str] = []

    indicate the columns are not allowed to update value

  • client_excluded_columns: List[str] = []

    indicate the excluded columns for the entity data should be response to client

  • dt_converter = PY_DT_Converter

    indicate datetime converter from database to json serializable dict

  • identity -> Tuple[Any]

    return tuple of columns as identification

  • primary_key_args -> Dict[str, Any]

    return key-value dict of primary key columns

  • non_primary_key_args -> Dict[str, Any]

    return key-value dict of non primary key columns

primary_keys() → List[str]

return list of primary key column names

non_primary_keys() → List[str]

return list of non primary key column names

columns() → List[str]

return list of column names

get_primary_key_args(**kwargs) → Dict[str, Any]

return key-value dict of primary key columns exist in **kwargs

  • **kwargs: keyworded, variable-length argument list of method
get_non_primary_key_args(**kwargs) → Dict[str, Any]

return key-value dict of non primary key columns exist in **kwargs

  • **kwargs: keyworded, variable-length argument list of method
get_entity_args(**kwargs) → Dict[str, Any]

return key-value dict of entity variables exist in **kwargs

  • **kwargs: keyworded, variable-length argument list of method
get_non_entity_args(**kwargs) → Dict[str, Any]

return key-value dict of non entity variables exist in **kwargs

  • **kwargs: keyworded, variable-length argument list of method
parameter_validation(check_fix: bool = True, **kwargs) → None

validate variables in **kwargs by specfied column_type_validations

  • check_fix: raise Exception if check_fix is True
  • **kwargs: keyworded, variable-length argument list of method
to_client_dict() → Dict[str, Any]

return dict excludes the keys specfied in client_excluded_columns

to_dict() → Dict[str, Any]

return dict of entity columns

equals(r: Entity) → bool

return True if r equals this entity

class hostray.util.orm.OrmDBEntityAccessor

db access worker owns db session and connection instance based on sqlalchemy.

set_orm_engine(db_module: DB_MODULE_NAME, declared_entity_base: DeclarativeMeta, autoflush: bool = False, **kwargs) → None

setup parameters to create sqlalchemy.engine.Engine instance

  • db_module: enum hostray.util.orm.DB_MODULE_NAME
  • declared_entity_base: DeclarativeMeta contains the schema meta of entity class
  • autoflush: set autoflash refer to sqlalchemy.orm.session.sessionmaker
close_session() → None:

release the session and connection.

Attention

close_session() should also be called in worker thread

class hostray.util.orm.OrmAccessWorkerPool

pool of hostray.util.orm.OrmDBEntityAccessor. inherit from hostray.util.worker.AsyncWorkerPool

enable_orm_log(echo: bool = False) → None

enable/disable sqlalchemy default logger stdout output

set_session_maker(db_module: DB_MODULE_NAME, declared_entity_base: DeclarativeMeta, autoflush: bool = False, **kwargs) → None

setup parameters to create sqlalchemy.engine.Engine instance

  • db_module: enum hostray.util.orm.DB_MODULE_NAME
  • declared_entity_base: DeclarativeMeta contains the schema meta of entity class
  • autoflush: set autoflash refer to sqlalchemy.orm.session.sessionmaker
reset_connection() → None

release all of the workers’ session and connection.

reset_connection_async() → None

asynchronously release all of the workers’ session and connection.

Util

get_class(module: str, *attrs) → type

return type or function instance of imported module

example:

cls = get_class("module", "class / static function", "class static function")
join_to_abs_path(*paths) → str

return os.path.join() absolute path in linux format which means replace ‘\’ to ‘/’

join_path(*paths) → str

return os.path.join() path in linux format which means replace ‘\’ to ‘/’

walk_to_file_paths(file_or_directory: str) → List[str]

return a list of absolutely path from the input directory path recursively or file

size_bytes_to_string(f_size: int, units: List[str] = ['bytes', 'KB', 'MB', 'GB', 'TB', 'PB']) → str

return byte size string in unit

generate_base64_uid(byte_length: int = 32, urlsafe: bool = True) → str

return customized uuid string

convert_tuple_to_dict(t: tuple, key_name: str) → Dict

return customized dict from tuple

example:

d = convert_tuple_to_dict((1, 2, 3), 'n'))

# d is {'n_1': 1, 'n_2': 2, 'n_3': 3}
get_host_ip(remote_host: str = '8.8.8.8', port: int = 80) → str

return the host ip, no guarantee to get actual host ip