commit 0f984b2c33cbdcd0e09552f0556df6e9cf21ba6d Author: theolem Date: Sun Dec 8 17:01:53 2019 +0100 first commit diff --git a/$ b/$ new file mode 100755 index 0000000..4cf6b60 --- /dev/null +++ b/$ @@ -0,0 +1,109 @@ +#include +#include "master.h" + +int MASTERID = 0; +int activeNodes[10]; //nodes active in the network. Assumes that all nodes have a unique id. +char *currentPrograms[100]; //current programs : array of char pointers, each is a program string. + +void main() +{ + initCurrentProgramsArray(); + + int newProgramIndex = manageUserInput(); + printf("New program : %s \n", currentPrograms[newProgramIndex]); +/* + for(;;) + { + manageUserInput(); + + printf("Entrez le nom d'un programme : \n >> ") ; + scanf("%s", program ); + + + printf(" %d ", sizeof(program)); + addToCurrentPrograms(program); + + //calls roundRobin to decide where it should be executed + //sends it over the network + //gets program return output and print + + + }*/ +} + +void initCurrentProgramsArray() +{ + int i=0; + while(i better for recovery diff --git a/auto.sh b/auto.sh new file mode 100755 index 0000000..d66a0b5 --- /dev/null +++ b/auto.sh @@ -0,0 +1,2 @@ +make +./build/master diff --git a/build/master b/build/master new file mode 100755 index 0000000..3780f53 Binary files /dev/null and b/build/master differ diff --git a/build/slave b/build/slave new file mode 100755 index 0000000..5ac3f1a Binary files /dev/null and b/build/slave differ diff --git a/cc b/cc new file mode 100755 index 0000000..94e9a6d --- /dev/null +++ b/cc @@ -0,0 +1,14 @@ +hello +hello +hello +hello +hello +hello +hello +hello +hello +hello +hello +hello +hello +hello diff --git a/dependencies b/dependencies new file mode 100755 index 0000000..8dd3b04 --- /dev/null +++ b/dependencies @@ -0,0 +1,2 @@ +gcc +libc6-dev diff --git a/files/program b/files/program new file mode 100755 index 0000000..e69de29 diff --git a/hello b/hello new file mode 100755 index 0000000..e69de29 diff --git a/max] b/max] new file mode 100755 index 0000000..e69de29 diff --git a/scripts/copy.sh b/scripts/copy.sh new file mode 100755 index 0000000..bd9e0c2 --- /dev/null +++ b/scripts/copy.sh @@ -0,0 +1,3 @@ +# copies code from master to all slaves +scp -r $PWD/../* theo@192.168.0.2:~Documents/recupPC/ recupPC +scp -r $PWD/../* theo@192.168.0.3:~Documents/recupPC/ recupPC diff --git a/scripts/netconfig.sh b/scripts/netconfig.sh new file mode 100755 index 0000000..590f110 --- /dev/null +++ b/scripts/netconfig.sh @@ -0,0 +1,4 @@ +# add ip address to ethernet interface +ip addr add 192.168.0.$1 dev eth0 +ip link set eth0 up + diff --git a/scripts/wait.sh b/scripts/wait.sh new file mode 100755 index 0000000..d2e45d6 --- /dev/null +++ b/scripts/wait.sh @@ -0,0 +1,6 @@ +echo "Testing..." +for var in $@ +do + sleep $var +done +echo "Done." diff --git a/scripts/wait_parallel.sh b/scripts/wait_parallel.sh new file mode 100755 index 0000000..8492999 --- /dev/null +++ b/scripts/wait_parallel.sh @@ -0,0 +1,14 @@ +echo "Testing..." + +SLEEPMAX=1 + +for var in $@ +do + sleep $var & + if (( $var > $SLEEPMAX )); + then + SLEEPMAX=$var + fi +done +sleep "$SLEEPMAX" +echo "Done." diff --git a/src/__pycache__/util.cpython-36.pyc b/src/__pycache__/util.cpython-36.pyc new file mode 100755 index 0000000..6743be3 Binary files /dev/null and b/src/__pycache__/util.cpython-36.pyc differ diff --git a/src/balancer.py b/src/balancer.py new file mode 100755 index 0000000..5cb8021 --- /dev/null +++ b/src/balancer.py @@ -0,0 +1,123 @@ +import socket +import threading +import logging +import time +import sys +from util import * + +message_queue=[] +if len(sys.argv) > 1 : + args = sys.argv[1:] + for arg in args : + message_queue.append("prog : " + arg) +else : + message_queue=["prog : echo hello"] + +print(message_queue) + +connections_threads=[] # active clients +data_socks={} + +HOST = "127.0.0.1" +PORT = 1235 + +data_port = input("Port data ?") ## port de donnée utilisé pour le premier utilisateur (ensuite incrémenté par 1) +if data_port == "": + data_port = 43222 +else : + data_port = int(data_port) + +nb_connections = 0 +stop = 0 + +s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + +### SERVER THREADS +class thread_get_input(threading.Thread): + def __init__(self) : + threading.Thread.__init__(self) + def run(self) : + while True : + program = get_program_input() + if program != "" : + message_queue.append("prog : " + program) + if stop == 1 : + break + +class thread_command_execution(threading.Thread) : + def __init__(self) : + threading.Thread.__init__(self) + self.round_robin_inc = 0 + def run(self) : + try : + while True : + if len(connections_threads) != 0 and len(message_queue) != 0 : + index = self.round_robin_inc % len(connections_threads) + thread = connections_threads[index] + self.round_robin_inc = self.round_robin_inc + 1 + program = message_queue.pop() + thread.command_program_execution(program) + if stop == 1 : + break + except KeyboardInterrupt : + self.s_data.close() + raise + +class thread_handle_socket(threading.Thread): + def __init__(self) : + threading.Thread.__init__(self) + + data_socks[data_port] = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.s_data = data_socks[data_port] + self.s_data.bind(('', data_port)) + self.s_data.listen() + self.conn, self.addr = self.s_data.accept() + + def command_program_execution(self, program): + self.conn.send(encode_message(program)) + start_time = time.time() + data = self.conn.recv(1024) + end_time = time.time() + mtype, ret_val = get_message_type(decode_message(data)) + print("Received " + str(ret_val) + " from " + str(self.addr[0]) + " in %s seconds ", end_time - start_time ) + + def run(self) : + while True : + time.sleep(1) + if stop == 1 : + break + +## MAIN + +# starts thread that reads input from user +thread_input = thread_get_input() +thread_input.start() + +thread_exec = thread_command_execution() +thread_exec.start() + +s.bind(('', PORT)) +s.listen() +try : + while True : + conn, addr = s.accept() + print("Nouvelle machine connectée : " + addr[0]) + #send data port to connected host + message = "prt : "+ str(data_port) + conn.sendall(encode_message(message)) + + # opens a new thread, which will open a data socket + connections_threads.append(thread_handle_socket()) + connections_threads[nb_connections].start() + + # increment it for the next one + data_port = data_port + 5; + nb_connections = nb_connections + 1 + + conn.close() + +except KeyboardInterrupt : + s.close() + stop = 1 + print(" Exiting...") + raise diff --git a/src/client.py b/src/client.py new file mode 100755 index 0000000..814ee17 --- /dev/null +++ b/src/client.py @@ -0,0 +1,86 @@ +import socket +import threading +import time +import sys +from util import * + +received_message_queue = [] +dataport = 0 +PORT = 1235 +stop = 0 + +empty_inc = 0 #inc var counting number of empty packets received + +# server control socket +if len(sys.argv) > 1: + SERVER=sys.argv[1] +else : + SERVER = "127.0.0.1" +print("server address: " , SERVER); + +### FUNCTIONS +def thread_execute_programs() : + while True : + if len(received_message_queue) != 0 : + print("handling message in thread") + message = received_message_queue.pop() + handle_message(message) + if stop == 1 : + break + time.sleep(1) + +## MAIN CODE +if __name__ == "__main__" : + + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect((SERVER, PORT)) # connection to server. Crashes if refused. + + # first receives port and closes connection to control socket... + data = s.recv(1024) + dataport = get_data_port(data) + # close connection + s.shutdown(socket.SHUT_RDWR) + s.close() + + print("Trying to open connection to data socket... ") + # opens the new server socket + while True : + try : + + s_data = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s_data.connect((SERVER, dataport)) + break + except ConnectionRefusedError : + print("retrying...") + time.sleep(1) + + thread_exec = threading.Thread(target = thread_execute_programs, args=() ) + thread_exec.start() + + # then gets data + while True : + try : + print("Waiting for data from server...") + data = s_data.recv(1024) + + if len(data) == 0 : + empty_inc = empty_inc + 1 + else : + empty_inc = 0 + + if empty_inc > 10 : + raise KeyboardInterrupt + + message = decode_message(data) + ret_val = handle_message(message) + if ret_val is not None : + s_data.send(str.encode(ret_val)) + elif ret_val == "-1" : + print("Closing data socket...") + raise KeyboardInterrupt + + except KeyboardInterrupt : + s_data.close() + stop = 1 + print(" Exiting...") + raise diff --git a/src/master b/src/master new file mode 100755 index 0000000..3780f53 Binary files /dev/null and b/src/master differ diff --git a/src/master.c b/src/master.c new file mode 100755 index 0000000..8bbbb2c --- /dev/null +++ b/src/master.c @@ -0,0 +1,110 @@ +#include +#include +#include "master.h" + +#define MASTERID = 0; +#define MAX_PROGRAM_NUMBER = 10; +#define MAX_PROGRAM_STR_LEN = 15; +int activeNodes[10]; //nodes active in the network. Assumes that all nodes have a unique id. +//char currentPrograms[MAX_PROGRAM_NUMBER][MAX_PROGRAM_STR_LEN]; //current programs : array of char pointers, each is a program string. + +void main() +{ + for(;;) + { + char program[50]; + printf("Entrez un programme à exécuter : \n"); + scanf("%s", program ); + + //calls roundRobin to decide where it should be executed + //sends it over the network + //gets program return output and print + + + } +} +/* +void initCurrentProgramsArray() +{ + int i=0; + while(i + +int node_id ; // id of local node, passed as an argument from shell + +void main() +{ + // read id from command line + for(;;) + { + // receives message from master + // runs said program (already on local system) + // returns the return value to master + } +} + +char getsMessageFromMaster() +{ + // reads message from socket and returns a char[] or Message structure +} + +int returnProgramNameFromMessage(char message[]) +{ + // returns the formatted program string to be passed to system call +} + +int runProgram(char programName[], char args[]) +{ + // simple system call + return 1 ; // or any other return form. For the moment int. +} diff --git a/src/util.py b/src/util.py new file mode 100755 index 0000000..98d3c0b --- /dev/null +++ b/src/util.py @@ -0,0 +1,63 @@ +import re +import subprocess + +regexp = "(.*) : (.*)" +prog = re.compile(regexp) + + +def add_client(addr) : + connections.append(addr) + print(connections) + +def del_client(addr) : + if addr in connections : + connections.remove(addr) + print(connections) + +def get_program_input() : + message = input("Entrez un programme à exécuter...") + return message + +def print_program_return(addr, ret_val) : + print("The program executed from " + str(addr) + " return the value " + str(ret_val)) + +def get_message_type(message) : + result=prog.findall(message) + if len(result) == 0 : + return None, None + return str(result[0][0]), str(result[0][1]) + +def get_data_port(message) : + clr_msg = decode_message(message) + mtype, mbody = get_message_type(clr_msg) + return int(mbody) + +def encode_message(message) : + return str.encode(message) + +def decode_message(raw_message) : + return raw_message.decode("utf-8") + +def execute_program(program) : + to_exec = program # executer en tâche de fond. + ret_val = subprocess.call(program + "&", shell=True) + return "val : " + str(ret_val) + +def handle_message(message): + mtype, mbody = get_message_type(message) + if mtype is None : + return None + if mtype == "prog" : + print("Received prog " + mbody ) + ret_val = execute_program(mbody) + return ret_val + if mtype == "clo" : + return -1 + elif mbody == "ret" : + print(message) + return None + +def round_robin() : + roundrobin_inc = roundrobin_inc + 1 + print("Roundrobin... inc = " + str(roundrobin_inc)) + return roundrobin_inc