Esperimenti di networking con Python socket

Python supporta nativamente i socket, ovvero la possibilità di instaurare una connessione diretta tra due applicativi mediante connessione diretta attraverso il protocollo IP. Questo consente di realizzare in maniera (relativamente) semplice applicazioni in grado di comunicare in maniera continua tra loro attraverso la rete.

Questo si presta particolarmente bene per la realizzazione di giochi multiplayer, in cui la comunicazione dovrebbe essere continua e con la minor latenza possibile (il che esclude, per esempio, la comunicazione mediante http).

La tipica architettura prevede l'esistenza di un server centrale a cui i client si connettono: i client trasmettono ciascuno il proprio stato al server (ad esempio, la posizione del giocatore), e il server lo reinvia a tutti gli altri, che in questo modo possono restare sincronizzati.

In questo piccolo esperimento voglio provare a definire un semplice esempio di architettura client/server. Il server resta in ascolto della richiesta di connessione dei client, e una volta instaurata gestisce ciascuna connessione attraverso un diverso thread.

Il Server

Iniziamo con il server, che sarà tutto racchiuso in una classe. Per prima cosa vogliamo definire il metodo di inizializzazione della classe, che si occuperà di inizializzare il socket. Già che ci siamo definiamo come proprietà della classe un paio di dizionari per contenere i thread che andremo a creare e i relativi dati scambiati da ciascun client. Impostiamo con settimeout un timeout di connessione di mezzo secondo (il perchè è spiegato più avanti).

class ThreadedServer(object):

    clients_data = {}
    clients_list = {}

    def __init__(self, host, port):

        self.host = host
        self.port = port
        
        print(f"Apertura socket su porta {self.port}...")
        
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.sock.settimeout(0.5)
        self.sock.bind((self.host, self.port))

Il metodo centrale di questa classe sarà quello che andrà ad ascoltare le richieste di connessione in ingresso e le passerà a un thread individuale.

    def listen(self):
                
        self.sock.listen(5)

        listening:bool = True
        print("attesa connessione...")

        while listening:

            try:
                
                client, address = self.sock.accept()
                client.settimeout(60)
                
                print("client connesso!")
                
                new_client_id = str(uuid.uuid4())
                new_thread = threading.Thread(
                    target = self.manage_client_connection, 
                    args = (client, address, new_client_id), 
                    daemon=True
                )
                new_thread.start()
                self.clients_list[new_client_id] = new_thread

            except socket.timeout:
                # print("Timeout")
                pass
            except KeyboardInterrupt:
                listening = False

        print("Thread principale terminato!")

Un paio di osservazioni:

  • Il metodo listen inizializza il socket in ascolto. Il parametro è il numero massimo di connessioni contemporanee supportate (in questo caso 5).
  • Il ciclo di attesa è un loop infinito. Usiamo una variabile booleana (listening) per sorvegliarne l'esecuzione.
  • Il metodo accept è quello che effettivamente blocca il programma in attesa di connessione. Durante l'inizializzazione abbiamo impostato un timeout sulla fase di connessione (0,5 secondi), che ci consente di far avanzare il loop (sollevando una TimeoutException) anche in assenza di connessioni: in questo modo possiamo sorvegliare la nascita di un'eventuale eccezione di tipo KeyboardError. Questa eccezione avviene quando l'utente preme Control+C, e in questo caso mettiamo a False la variabile di ciclo e possiamo così chiudere il programma; in assenza di questo stratagemma il programma, in assenza di connessioni, rimarrebbe in eterno fermo in attesa e non sarebbe possibile terminarlo da tastiera. Naturalmente, le eccezioni dovute a timeout sono previste e quindi semplicemente ignorate: non è il metodo più efficiente o elegante ma ci accontentiamo.
  • Quando viene accettata una connessione, viene generato un id casuale (per identificarla in maniera univoca) con il metodo uuid4. Viene poi costruito e avviato un thread attorno al metodo manage_client_connection, a cui viene passata la nuova connessione. A questo punto il programma principale può dimenticarsi di questa connessione, che sarà d'ora in avanti gestita dal thread, e rimettersi in ascolto della successiva.

A questo punto non ci resta che realizzare il metodo manage_client_connection, che fa il grosso del lavoro per ciascuno dei socket.

    def manage_client_connection(self, client:socket.socket, address, client_id:str):

        size = 1024

        # Handshake
        data = msgpack.unpackb(client.recv(size))
        print("Received handshake: ", data)
        handshake_reply = {
            "id": client_id
        }
        client.send(msgpack.packb(handshake_reply))

        while True:

            try:
                
                data = client.recv(size)

                if data:
                    
                    self.clients_data[client_id] = msgpack.unpackb(data)

                    client.send(msgpack.packb(self.clients_data))

                else:
                    raise Exception('Client disconnected')
                
            except Exception as e:

                print(e)
                client.close()
                del self.clients_data[client_id]
                return False

La comunicazione attraverso il socket è svolta usando la libreria msgpack, che consente di convertire in un formato binario dei flussi JSON. Questo fa si che la comunicazione sia estremamente più compatta e quindi efficiente (comunicare direttamente mediante stringhe JSON è ESTREMAMENTE poco efficiente).

Quando la connessione viene instaurata viene effettuato un handshake: si suppone che il client appena connesso ci mandi un pacchetto dati (che al momento è ignorato, ma è utile predisporlo per il futuro, un domani potrebbe contenere per esempio le credenziali dell'utente del client, o la versione, eccetera) a cui noi rispondiamo con un pacchetto dati contenente l'ID che abbiamo assegnato in precedenza a questa specifica connessione.

A questo punto inizia il loop infinito: il server resta in attesa di dati dal client, e quando li riceve li salva nel dizionario definito nella classe principale, indicizzandolo con l'uuid della connessione. Una volta ricevuto l'aggiornamento poi mandiamo al client i dati di TUTTI i client, in questo modo il client è sincronizzato con gli stati di tutti gli altri client connessi; ad esempio, possiamo immaginare che, nel caso di un gioco multiplayer, ogni client mandi al server la posizione (x,y) del proprio giocatore, e in cambio riceva le posizioni di tutti gli altri per poterli disegnare correttamente.

La classe del server è quindi terminata, e la possiamo avviare semplicemente così:

if __name__ == "__main__":
    ThreadedServer('', 12345).listen()

Il Client

Il client è un pò più complesso perchè deve contenere, oltre alla parte di networking, tutta la logica di gioco. In questo esempio ci focalizziamo solo sulla componente di networking, e il prototipo di gioco attorno lo lasciamo al prossimo post. Dentro il client quindi ci sarà una classe che si occuperà di gestire la comunicazione con il server.

La classe definisce una manciata di attributi, commentati qui sotto, e un metodo di inizializzazione che semplicemente imposta host e porta del server a cui connettersi.

class NetworkConnection:
   
  client_socket = None
  thread = None

  # Il client deve tentare di restare connesso
  _stay_connected = False

  # Stato connessione
  _is_connected = False

  # ID assegnato durante handshake
  player_id = None

  # Ultimo payload ricevuto dal server
  received_data = {}


  def __init__(self, host, port):
    self.host = host
    self.port = port
    self._stay_connected = False

Definiamo un metodo "connect", che verrà chiamato per attivare la connessione vera e propria, e un metodo "disconnect" che si spiega da solo. Il metodo connect si occupa semplicemente di creare un thread attorno al metodo "_worker", che gestisce le attività legate alla connessione in maniera autonoma. Viene inoltre garantito che il thread sia unico, se esiste già il "connect" lo distrugge prima di ricrearlo.

  def connect(self):
  
    self._is_connected = False
    
    # Si assicura che eventuale thread esistente sia chiuso
    if (self.thread != None and self.thread.is_alive()):
      self._stay_connected = False
      while(self.thread.is_alive()):
        time.sleep(.1)
  
    self._stay_connected = True
  
    self.thread = threading.Thread(target=self._worker, args=(), daemon=True)
    self.thread.start()
  
  
  def disconnect(self):
    self._stay_connected = False
    
  def is_connected(self):
    return self._is_connected

A questo punto dobbiamo solo definire il cuore della classe, il metodo "_worker". Questo metodo, che come abbiamo visto è eseguito in un thread, è costruito su due loop infiniti entrambi controllati dalla variabile "stay_connected".

Il loop interno è in ascolto sul canale di comunicazione, e ogni volta che riceve dati li salva nella variabile d'istanza self.received_data (accessibile dal resto del programma).

    while(self._stay_connected):
      
      ...
      
      while(self._stay_connected):

        data = self.client_socket.recv(1024)

        try:
          self.received_data = msgpack.unpackb(data)
        except Exception:
          _logger.exception("Errore durante socket recv")
          break

Il loop esterno, invece, entra in gioco all'avvio del thread e in tutte le eventuali situazioni in cui in loop interno dovesse morire a causa di errori di connessione o simili. Questo blocco si occupa di effettuare la connessione vera e propria al socket ed eseguire l'handshake (come già visto dal punto di vista del server).

    while(self._stay_connected):
      
      ...
      self._connect_socket()
      ...
      
      ...
      self._handshake()
      ...
      
      self._is_connected = True

      while(self._stay_connected):
        ...

In ogni caso, entrambi i loop terminano quando la variabile viene messa a un valore False. Mettendo attorno un pò di gestione di errori e l'implementazione dei metodi di connessione e handshake otteniamo questo codice:

  def _connect_socket(self):
    self.client_socket = socket.socket()
    self.client_socket.connect((self.host, self.port))


  def _handshake(self):

    _logger.debug("Effettuazione handshake...")

    handshake_payload = {
      "text": "hi"
    }
    
    self.client_socket.send(msgpack.packb(handshake_payload))
    handshake_recv = msgpack.unpackb(self.client_socket.recv(1024))
    self.player_id = handshake_recv['id']
    
    _logger.debug(f"Server assigned me this id: {self.player_id}")


  # Thread principale
  def _worker(self):

    while(self._stay_connected):
      
      try:
        self._connect_socket()
      except Exception:
        _logger.exception("Connessione fallita!")
        time.sleep(1)
        self.client_socket = None
        self.thread = None
        return

      try:
        self._handshake()
      except Exception:
        _logger.exception("Handshake fallito!")
        time.sleep(1)
        self.client_socket = None
        self.thread = None
        return
      
      self._is_connected = True

      while(self._stay_connected):

        data = self.client_socket.recv(1024)

        try:
          self.received_data = msgpack.unpackb(data)
        except Exception:
          _logger.exception("Errore durante socket recv")
          break

    try:
      self.client_socket.close()
    except Exception as _:
      pass
    
    self._is_connected = False

    self.client_socket = None
    self.thread = None

    _logger.debug("Client thread terminato")

Per completare la classe, creiamo anche un metodo che può essere richiamato dal programma principale per inviare dati (finora abbiamo visto solo la ricezione).

  def send(self, data:dict):

    if (self.thread == None or not self.thread.is_alive()):
      _logger.warning("send while thread not alive, reconnecting...")
      self.connect()

    if (self.client_socket):
      try:
        packed_data = msgpack.packb(data)
        self.client_socket.send(packed_data)
      except Exception:
        # _logger.exception("Impossibile trasmettere")
        pass

Il Test

A questo punto dovremmo avere tutto pronto per testare. Costruiamo un semplicissimo script che:

  • inizializza la classe ed effettua la connessione
  • chiede all'utente di inserire un messaggio
  • invia un messaggio
  • attende ulteriore conferma dall'utente (simula delay tra invio e ricezione)
  • stampa client.received_data, in cui mi aspetto di trovare il messaggio che ho inviato in precedenza.
client = NetworkConnection(socket.gethostname(), 12345)
client.connect()

messaggio = input("Inserire messaggio:")

client.send(messaggio)

input("...")

print(client.received_data)

Per prima cosa avviamo il server:

Poi avviamo il client. Questo si connette immediatamente al server, effettua l'handshake e riceve un id univoco:

Inseriamo un messaggio qualunque nel client:

Premiamo ancora invio per superare il secondo input e vediamo che il nostro client ha ricevuto dal server il messaggio precedentemente inviato:

Il primo client è terminato e la sua connessione è quindi stata chiusa (come mostra l'errore nella finestra del server).

Conclusione

Il codice sopra è estremamente grezzo, con scarsa gestione degli errori e dei casi limite, e funzionicchia dignitosamente ma senza pretese. Diciamo che è un buon esempio accademico. Il prossimo passo sarà provare a inserirlo in un programma Pygame per provare a metterlo alla prova con qualcosa di più visivo.

Grazie per l'attenzione e alla prossima!