Source code for reahl.ptongue.gemproxyrpc

# Copyright (C) 2025 Reahl Software Services (Pty) Ltd
# 
# This file is part of parseltongue.
#
# parseltongue is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# parseltongue is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with parseltongue.  If not, see <https://www.gnu.org/licenses/>.
"""
RPC Sessions
============

This module provides thread-safe remote procedure call (RPC) connectivity to 
GemStone/S 64 Bit object databases.
"""
import ctypes
import os
import warnings

from .gemstone import *
from .gemproxy import GemstoneLibrary, GemObject, GemstoneSession, GemstoneError, to_c_bytes, InvalidSession, \
    GemstoneApiError, GemstoneWarning


class GciTs(GemstoneLibrary):
    short_name = 'gcits'

    def __init__(self, lib_path):
        super().__init__(lib_path)
        self.initial_fetch_size = 200
        
        self.GciTsEncrypt = self.library.GciTsEncrypt
        self.GciTsEncrypt.restype = ctypes.c_char_p
        self.GciTsEncrypt.argtypes = [ctypes.c_char_p, ctypes.c_char_p, ctypes.c_size_t]
        
        self.GciTsLogout = self.library.GciTsLogout
        self.GciTsLogout.restype = BoolType
        self.GciTsLogout.argtypes = [GciSession, ctypes.POINTER(GciErrSType)]
        
        self.GciTsSessionIsRemote = self.library.GciTsSessionIsRemote
        self.GciTsSessionIsRemote.restype = ctypes.c_int
        self.GciTsSessionIsRemote.argtypes = [GciSession]

        self.GciTsExecute = self.library.GciTsExecute
        self.GciTsExecute.restype = OopType
        self.GciTsExecute.argtypes = [GciSession, ctypes.c_char_p, OopType, OopType, OopType, ctypes.c_int, ctypes.c_ushort, ctypes.POINTER(GciErrSType)]

        self.GciTsPerform = self.library.GciTsPerform
        self.GciTsPerform.restype = OopType
        self.GciTsPerform.argtypes = [GciSession, OopType, OopType, ctypes.c_char_p, ctypes.POINTER(OopType), ctypes.c_int, ctypes.c_int,
                                      ctypes.c_ushort, ctypes.POINTER(GciErrSType)]

        self.GciTsResolveSymbol = self.library.GciTsResolveSymbol
        self.GciTsResolveSymbol.restype = OopType
        self.GciTsResolveSymbol.argtypes = [GciSession, ctypes.c_char_p, OopType, ctypes.POINTER(GciErrSType)]

        self.GciTsResolveSymbolObj = self.library.GciTsResolveSymbolObj
        self.GciTsResolveSymbolObj.restype = OopType
        self.GciTsResolveSymbolObj.argtypes = [GciSession, OopType, OopType, ctypes.POINTER(GciErrSType)]

        self.GciTsNewSymbol = self.library.GciTsNewSymbol
        self.GciTsNewSymbol.restype = OopType
        self.GciTsNewSymbol.argtypes = [GciSession, ctypes.c_char_p, ctypes.POINTER(GciErrSType)]

        self.GciTsIsKindOf = self.library.GciTsIsKindOf
        self.GciTsIsKindOf.restype = ctypes.c_int
        self.GciTsIsKindOf.argtypes = [GciSession, OopType, OopType, ctypes.POINTER(GciErrSType)]

        self.GciTsFetchClass = self.library.GciTsFetchClass
        self.GciTsFetchClass.restype = OopType
        self.GciTsFetchClass.argtypes = [GciSession, OopType, ctypes.POINTER(GciErrSType)]

        self.GciTsAbort = self.library.GciTsAbort        
        self.GciTsAbort.restype = BoolType
        self.GciTsAbort.argtypes = [GciSession, ctypes.POINTER(GciErrSType)]

        self.GciTsCommit = self.library.GciTsCommit        
        self.GciTsCommit.restype = BoolType
        self.GciTsCommit.argtypes = [GciSession, ctypes.POINTER(GciErrSType)]

        self.GciTsBegin = self.library.GciTsBegin
        self.GciTsBegin.restype = BoolType
        self.GciTsBegin.argtypes = [GciSession, ctypes.POINTER(GciErrSType)]

        self.GciTsOopToDouble = self.library.GciTsOopToDouble
        self.GciTsOopToDouble.restype = BoolType
        self.GciTsOopToDouble.argtypes = [GciSession, OopType, ctypes.POINTER(ctypes.c_double), ctypes.POINTER(GciErrSType)]

        self.GciTsOopToI64 = self.library.GciTsOopToI64
        self.GciTsOopToI64.restype = BoolType
        self.GciTsOopToI64.argtypes = [GciSession, OopType, ctypes.POINTER(ctypes.c_int64), ctypes.POINTER(GciErrSType)]

        self.GciTsDoubleToOop = self.library.GciTsDoubleToOop
        self.GciTsDoubleToOop.restype = OopType
        self.GciTsDoubleToOop.argtypes = [GciSession, ctypes.c_double, ctypes.POINTER(GciErrSType)]

        self.GciTsI64ToOop = self.library.GciTsI64ToOop
        self.GciTsI64ToOop.restype = OopType
        self.GciTsI64ToOop.argtypes = [GciSession, ctypes.c_int64, ctypes.POINTER(GciErrSType)]

        self.GciTsFetchUtf8 = self.library.GciTsFetchUtf8
        self.GciTsFetchUtf8.restype = ctypes.c_int64
        self.GciTsFetchUtf8.argtypes = [GciSession, OopType, ctypes.POINTER(ByteType), ctypes.c_int64, ctypes.POINTER(ctypes.c_int64), ctypes.POINTER(GciErrSType)]

        self.GciTsFetchUtf8Bytes = self.library.GciTsFetchUtf8Bytes
        self.GciTsFetchUtf8Bytes.restype = ctypes.c_int64
        self.GciTsFetchUtf8Bytes.argtypes = [GciSession, OopType, ctypes.c_int64, ctypes.POINTER(ByteType), ctypes.c_int64, ctypes.POINTER(OopType), ctypes.POINTER(GciErrSType), ctypes.c_int]

        self.GciTsFetchBytes = self.library.GciTsFetchBytes
        self.GciTsFetchBytes.restype = ctypes.c_int64
        self.GciTsFetchBytes.argtypes = [GciSession, OopType, ctypes.c_int64, ctypes.POINTER(ByteType), ctypes.c_int64, ctypes.POINTER(GciErrSType)]

        self.GciTsNewUtf8String = self.library.GciTsNewUtf8String
        self.GciTsNewUtf8String.restype = OopType
        self.GciTsNewUtf8String.argtypes = [GciSession, ctypes.c_char_p, BoolType, ctypes.POINTER(GciErrSType)]

        self.GciTsReleaseObjs = self.library.GciTsReleaseObjs
        self.GciTsReleaseObjs.restype = BoolType
        self.GciTsReleaseObjs.argtypes = [GciSession, ctypes.POINTER(OopType), ctypes.c_int, ctypes.POINTER(GciErrSType)]

        self.GciTsContinueWith = self.library.GciTsContinueWith
        self.GciTsContinueWith.restype = OopType
        self.GciTsContinueWith.argtypes = [GciSession, OopType, OopType, ctypes.POINTER(GciErrSType), ctypes.c_int, ctypes.POINTER(GciErrSType)]

        self.GciTsClearStack = self.library.GciTsClearStack
        self.GciTsClearStack.restype = BoolType
        self.GciTsClearStack.argtypes = [GciSession, OopType, ctypes.POINTER(GciErrSType)]
        
        
    def encrypt_password(self, unencrypted_password):
        if not unencrypted_password:
            return None
        out_buff_size = 0
        encrypted_char = 0
        while encrypted_char == 0:
            out_buff_size = out_buff_size + self.initial_fetch_size
            out_buff = ctypes.create_string_buffer(out_buff_size)
            encrypted_char = self.GciTsEncrypt(unencrypted_password.encode('utf-8'), out_buff, out_buff_size)
        return out_buff.value


class GciTs34(GciTs):
    min_version = '3.4.0'
    max_version = '3.4.9999'

    def __init__(self, lib_path):
        super().__init__(lib_path)

        self.GciTsLogin = self.library.GciTsLogin
        self.GciTsLogin.restype = GciSession
        self.GciTsLogin.argtypes = [ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p, BoolType, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p,
                                    ctypes.c_uint, ctypes.c_int, ctypes.POINTER(GciErrSType)]

    def log_in(self, stone_name, host_username, host_password, netldi_task, username, password):
        error = GciErrSType()
        session = self.GciTsLogin(stone_name.encode('utf-8'),
                                  to_c_bytes(host_username),
                                  self.encrypt_password(host_password),
                                  True,
                                  netldi_task.encode('utf-8'),
                                  username.encode('utf-8'),
                                  self.encrypt_password(password),
                                  GCI_LOGIN_PW_ENCRYPTED | GCI_LOGIN_QUIET,
                                  0,
                                  ctypes.byref(error))
        
        if not session:
            raise GemstoneError(self, error)

        return session


GemstoneLibrary.register(GciTs34)


class GciTs35(GciTs):
    min_version = '3.5.0'
    max_version = '3.7.9999'

    def __init__(self, lib_path):
        super().__init__(lib_path)
        
        self.GciTsLogin = self.library.GciTsLogin
        self.GciTsLogin.restype = GciSession
        self.GciTsLogin.argtypes = [ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p, BoolType, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p,
                                    ctypes.c_uint, ctypes.c_int, ctypes.POINTER(BoolType), ctypes.POINTER(GciErrSType)]

        
    def log_in(self, stone_name, host_username, host_password, netldi_task, username, password):
        error = GciErrSType()
        executed_session_init = ctypes.c_int()
        session = self.GciTsLogin(stone_name.encode('utf-8'),
                                  to_c_bytes(host_username),
                                  self.encrypt_password(host_password),
                                  True,
                                  netldi_task.encode('utf-8'),
                                  username.encode('utf-8'),
                                  self.encrypt_password(password),
                                  GCI_LOGIN_PW_ENCRYPTED | GCI_LOGIN_QUIET,
                                  0,
                                  ctypes.byref(executed_session_init),
                                  ctypes.byref(error))

        if not session:
            raise GemstoneError(self, error)

        if not executed_session_init:
            warnings.warn(('{}: {}, {}'.format(error.exceptionObj, error.message, error.reason)).replace('\\n', ''), GemstoneWarning)

        return session


GemstoneLibrary.register(GciTs35)


#======================================================================================================================
[docs] class RPCSession(GemstoneSession): """ A session that interacts with a remote Gemstone database via remote procedure call. Creating an RPCSession implies logging in, and for a gem to be created for the session on the remote side. :param username: GemStone username for repository authentication :param password: GemStone password for repository authentication :param stone_name: Name of the stone (repository) to connect to :param host_username: Operating system username for host authentication :param host_password: Operating system password for host authentication :param netldi_task: Network service name """ def __init__(self, username, password, stone_name='gs64stone', host_username=None, host_password=None, netldi_task='gemnetobject'): super().__init__() self.gci = GemstoneLibrary.find_library('gcits') self.c_session = self.gci.log_in(stone_name, host_username, host_password, netldi_task, username, password) def encrypt_password(self, unencrypted_password): return self.gci.encrypt_password(unencrypted_password) def remove_dead_gemstone_objects(self): error = GciErrSType() unreferenced_gemstone_objects = [oop for oop in self.deallocated_unfreed_gemstone_objects if oop not in self.instances] if unreferenced_gemstone_objects: c_dead_oops = (OopType * len(unreferenced_gemstone_objects))(*unreferenced_gemstone_objects) if not self.gci.GciTsReleaseObjs(self.c_session, c_dead_oops, len(unreferenced_gemstone_objects), ctypes.byref(error)): raise GemstoneError(self, error) self.deallocated_unfreed_gemstone_objects.clear()
[docs] def abort(self): """ Abort the current transaction. :raises GemstoneError: If the abort operation fails """ error = GciErrSType() if not self.gci.GciTsAbort(self.c_session, ctypes.byref(error)): raise GemstoneError(self, error)
[docs] def begin(self): """ Begin a new transaction. :raises GemstoneError: If the begin operation fails """ error = GciErrSType() if not self.gci.GciTsBegin(self.c_session, ctypes.byref(error)): raise GemstoneError(self, error)
[docs] def commit(self): """ Commit the current transaction. :raises GemstoneError: If the commit operation fails """ error = GciErrSType() if not self.gci.GciTsCommit(self.c_session, ctypes.byref(error)): raise GemstoneError(self, error)
@property def is_remote(self): """ Check if the session is remote. :return: True if the session is remote, False otherwise :raises InvalidSession: If the session is invalid """ remote = self.gci.GciTsSessionIsRemote(self.c_session) if remote == -1: raise InvalidSession() return bool(remote) @property def is_logged_in(self): """ Check if the session is currently logged in. :return: True if the session is logged in, False otherwise """ remote = self.gci.GciTsSessionIsRemote(self.c_session) return remote != -1 def py_to_string_(self, py_str): error = GciErrSType() return_oop = self.gci.GciTsNewUtf8String(self.c_session, py_str.encode('utf-8'), True, ctypes.byref(error)) if return_oop == OOP_ILLEGAL.value: raise GemstoneError(self, error) return return_oop def py_to_float_(self, py_float): error = GciErrSType() return_oop = self.gci.GciTsDoubleToOop(self.c_session, py_float, ctypes.byref(error)) if return_oop == OOP_ILLEGAL.value: raise GemstoneError(self, error) return return_oop
[docs] def execute(self, source, context=None, symbol_list=None): """ Execute a GemStone Smalltalk expression. :param source: String or GemObject containing Smalltalk code to execute :param context: Optional context object for the execution :param symbol_list: Optional symbol list for name resolution :return: GemObject representing the result of execution :raises GemstoneApiError: If source is not a string or GemObject :raises GemstoneError: If execution fails """ error = GciErrSType() if isinstance(source, str): return_oop = self.gci.GciTsExecute(self.c_session, source.encode('utf-8'), OOP_CLASS_Utf8, context.oop if context else OOP_NIL, symbol_list.oop if symbol_list else OOP_NIL, 0, 0, ctypes.byref(error)) elif isinstance(source, GemObject): return_oop = self.gci.GciTsExecute(self.c_session, None, source.oop, context.oop if context else OOP_NIL, symbol_list.oop if symbol_list else OOP_NIL, 0, 0, ctypes.byref(error)) else: raise GemstoneApiError('Source is type {}.Expected source to be a str or GemObject'.format(source.__class__.__name__)) if return_oop == OOP_ILLEGAL.value: raise GemstoneError(self, error) return self.get_or_create_gem_object(return_oop)
[docs] def new_symbol(self, py_string): """ Create a new GemStone symbol. :param py_string: String to convert to a symbol :return: GemObject representing the created symbol :raises GemstoneError: If symbol creation fails """ error = GciErrSType() return_oop = self.gci.GciTsNewSymbol(self.c_session, py_string.encode('utf-8'), ctypes.byref(error)) if return_oop == OOP_ILLEGAL.value: raise GemstoneError(self, error) return self.get_or_create_gem_object(return_oop)
[docs] def resolve_symbol(self, symbol, symbol_list=None): """ Resolve a symbol in the GemStone symbol list. There is a shorthand for this method. These lines are equivalent:: session.SymbolName session.resolve_symbol('SymbolName') :param symbol: String or GemObject symbol to resolve :param symbol_list: Optional symbol list for resolution :return: GemObject representing the resolved symbol :raises GemstoneApiError: If symbol is not a string or GemObject :raises GemstoneError: If resolution fails """ error = GciErrSType() if isinstance(symbol, str): return_oop = self.gci.GciTsResolveSymbol(self.c_session, symbol.encode('utf-8'), symbol_list.oop if symbol_list else OOP_NIL, ctypes.byref(error)) elif isinstance(symbol, GemObject): return_oop = self.gci.GciTsResolveSymbolObj(self.c_session, symbol.oop, symbol_list.oop if symbol_list else OOP_NIL, ctypes.byref(error)) else: raise GemstoneApiError('Symbol is type {}.Expected symbol to be a str or GemObject'.format(symbol.__class__.__name__)) if return_oop == OOP_ILLEGAL.value: raise GemstoneError(self, error) return self.get_or_create_gem_object(return_oop)
[docs] def log_out(self): """ Log out from the GemStone session. :raises GemstoneError: If logout fails """ error = GciErrSType() if not self.gci.GciTsLogout(self.c_session, ctypes.byref(error)): raise GemstoneError(self, error)
def object_is_kind_of(self, instance, a_class): error = GciErrSType() is_kind_of_result = self.gci.GciTsIsKindOf(self.c_session, instance.oop, a_class.oop, ctypes.byref(error)) if is_kind_of_result == -1: raise GemstoneError(self, error) return bool(is_kind_of_result) def object_gemstone_class(self, instance): error = GciErrSType() return_oop = self.gci.GciTsFetchClass(self.c_session, instance.oop, ctypes.byref(error)) if return_oop == OOP_ILLEGAL.value: raise GemstoneError(self, error) return self.get_or_create_gem_object(return_oop) def object_float_to_py(self, instance): error = GciErrSType() result = ctypes.c_double() if not self.gci.GciTsOopToDouble(self.c_session, instance.oop, ctypes.byref(result), ctypes.byref(error)): raise GemstoneError(self, error) return result.value def object_string_to_py(self, instance): error = GciErrSType() start_index = 1 num_bytes = self.initial_fetch_size bytes_returned = num_bytes error = GciErrSType() py_bytes = b'' utf8_string = OopType(OOP_NIL.value) while bytes_returned == num_bytes: dest = (ByteType * (num_bytes + 1))() bytes_returned = self.gci.GciTsFetchUtf8Bytes(self.c_session, instance.oop, start_index, dest, num_bytes, ctypes.byref(utf8_string), ctypes.byref(error), 0) if bytes_returned == -1: raise GemstoneError(self, error) py_bytes += bytearray(dest[:bytes_returned]) start_index = start_index + num_bytes if utf8_string.value != OOP_NIL.value: if not self.gci.GciTsReleaseObjs(self.c_session, ctypes.byref(utf8_string), 1, ctypes.byref(error)): raise GemstoneError(self, error) return py_bytes.decode('utf-8') def object_latin1_to_py(self, instance): return self.object_bytes_to_py(instance).decode('latin-1') def object_bytes_to_py(self, instance): error = GciErrSType() start_index = 1 num_bytes = self.initial_fetch_size bytes_returned = num_bytes py_bytes = b'' while bytes_returned == num_bytes: dest = (ByteType * (num_bytes + 1))() bytes_returned = self.gci.GciTsFetchBytes(self.c_session, instance.oop, start_index, dest, num_bytes, ctypes.byref(error)); if bytes_returned == -1: raise GemstoneError(self, error) py_bytes += bytearray(dest[:bytes_returned]) start_index = start_index + num_bytes return py_bytes def object_perform(self, instance, selector, *args): error = GciErrSType() if not isinstance(selector, (str, GemObject)): raise GemstoneApiError('Selector is type {}.Expected selector to be a str or GemObject'.format(selector.__class__.__name__)) selector_oop = selector.oop if isinstance(selector, GemObject) else OOP_ILLEGAL selector_str = to_c_bytes(selector) if isinstance(selector, str) else None cargs = (OopType * len(args))(*[i.oop for i in args]) flags = 1 environment_id = 0 return_oop = self.gci.GciTsPerform(self.c_session, instance.oop, selector_oop, selector_str, cargs, len(args), flags, environment_id, ctypes.byref(error)) if return_oop == OOP_ILLEGAL.value: raise GemstoneError(self, error) return self.get_or_create_gem_object(return_oop) def object_continue_with(self, gemstone_process, continue_with_error_oop, replace_top_of_stack_oop): error = GciErrSType() return_oop = self.gci.GciTsContinueWith(self.c_session, gemstone_process.oop, replace_top_of_stack_oop, continue_with_error_oop, 0, ctypes.byref(error)) if return_oop == OOP_ILLEGAL.value: raise GemstoneError(self, error) return self.get_or_create_gem_object(return_oop) def object_clear_stack(self, gemstone_process): error = GciErrSType() success = self.gci.GciTsClearStack(self.c_session, gemstone_process.oop, ctypes.byref(error)) if not success: raise GemstoneError(self, error)
#======================================================================================================================