You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

469 lines
15 KiB

#include <inttypes.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <unistd.h>
#include "interface.h"
#ifdef debug
#define printdbg(A) fputs(A, stderr)
#define printfdbg(...) fprintf(stderr, __VA_ARGS__)
#else
#define printdbg(A) do {} while (0)
#define printfdbg(...) do {} while (0)
#endif
struct packet_t {
uint32_t exp_addr;
uint32_t dest_addr;
uint32_t msg_size;
uint32_t data_hash;
uint16_t pkt_serial;
uint16_t data_size;
void *data;
};
struct frame_t {
uint16_t size;
uint8_t token;
struct packet_t *packets;
};
uint32_t checksum(char* data, size_t size) {
uint32_t sum = 0;
size_t i=0;
for(; i+sizeof(uint32_t) < size; i += sizeof(uint32_t))
sum ^= *((uint32_t*) data+i);
char tmp[sizeof(uint32_t)] = {0};
for (size_t j=0; j<size-i; j++)
tmp[j] = data[i+j];
sum ^= *((uint32_t*) tmp);
return sum;
}
struct list_subscription_t list_subscription;
struct packet_t build_next_packet(uint32_t exp_addr, uint16_t max_size)
// Return a zero'ed packet if there is no packet to send.
{
static struct message_t current_message = {0,0,NULL};
static size_t current_message_offset = 0;
static uint16_t current_serial_number = 0;
if (max_size < sizeof(4 * sizeof(uint32_t) + 2 * sizeof(uint16_t)))
{
printdbg("Not enough space available to build a packet. Abort.\n");
return (struct packet_t) {0};
}
if (current_message.length == current_message_offset)
{
free(current_message.data_p);
current_message = (struct message_t) {0};
current_message_offset = 0;
current_serial_number = 0;
if(read_message(&list_subscription, &current_message) <= 0)
{
printdbg("No message to pick from the message queue.\n");
return (struct packet_t) {0};
}
}
uint16_t size = (current_message.length - current_message_offset < max_size) ?
(current_message.length - current_message_offset) :
max_size;
struct packet_t packet = {
exp_addr,
current_message.addr,
current_message.length,
checksum(current_message.data_p + current_message_offset, size),
current_serial_number ++,
size,
current_message.data_p + current_message_offset
};
current_message_offset += size;
return packet;
}
size_t packet_to_raw(const struct packet_t* packet, void** output)
/* Returns the number of bytes of the new raw message.
Returns 0 on an error.
Remember to free *output. */
{
size_t size = 4*sizeof(uint32_t) + 2*sizeof(uint16_t) + packet->data_size;
void* tmp = *output = malloc(4*sizeof(uint32_t) + 2*sizeof(uint16_t) + packet->data_size);
if (tmp == NULL) // Unable to malloc
return 0;
//#warning Ordre valide, selon la spec ?
*((uint32_t*) tmp) = htonl(packet->exp_addr);
tmp += sizeof(uint32_t);
*((uint32_t*) tmp) = htonl(packet->dest_addr);
tmp += sizeof(uint32_t);
*((uint32_t*) tmp) = htonl(packet->msg_size);
tmp += sizeof(uint32_t);
*((uint32_t*) tmp) = packet->data_hash;
tmp += sizeof(uint32_t);
*((uint16_t*) tmp) = htons(packet->pkt_serial);
tmp += sizeof(uint16_t);
*((uint16_t*) tmp) = htons(packet->data_size);
tmp += sizeof(uint16_t);
memcpy(tmp, packet->data, packet->data_size);
return size;
}
int packet_of_raw(const void* raw_packet, struct packet_t* output_packet)
/* Returns 0 on success.
Remember to free output_packet->data. */
{
const void* tmp = raw_packet;
//#warning Ordre valide, selon la spec ?
output_packet->exp_addr = ntohl(*((uint32_t*) tmp));
tmp += sizeof(uint32_t);
output_packet->dest_addr = ntohl(*((uint32_t*) tmp));
tmp += sizeof(uint32_t);
output_packet->msg_size = ntohl(*((uint32_t*) tmp));
tmp += sizeof(uint32_t);
output_packet->data_hash = *((uint32_t*) tmp);
tmp += sizeof(uint32_t);
output_packet->pkt_serial = ntohs(*((uint16_t*) tmp));
tmp += sizeof(uint16_t);
output_packet->data_size = ntohs(*((uint16_t*) tmp));
tmp += sizeof(uint16_t);
if (output_packet->data_size == 0)
output_packet->data = NULL;
else if ((output_packet->data = malloc(output_packet->data_size)) == NULL)
return 1;
memcpy(output_packet->data, tmp, output_packet->data_size);
return 0;
}
void process_token_frame(struct frame_t* frame, uint32_t our_address, const unsigned int available_size)
/* Does not check *frame is really a raw token frame. Assumes the user won't be stupid by passing a discovery frame. */
{
struct packet_t packet;
int count = 0;
void* current_read_position = (void*) frame->packets;
void* current_write_position = current_read_position;
// First step: read all packets
while (current_read_position - (void*) (frame->packets) < frame->size - sizeof(uint16_t) - sizeof(uint8_t))
{
count ++;
printfdbg("%p\t%p\t%d\n", current_read_position, current_write_position, count);
if (packet_of_raw(current_read_position, &packet) != 0)
{
//#warning Maybe change current_read_position?
printdbg("Error: process_raw_token_frame: packet_of_raw returned an error.");
continue;
}
if (packet.data_hash != checksum(packet.data, packet.data_size))
{
//#warning Maybe change current_read_position? (again)
printdbg("Error: invalid checksum. Deleting the packet.\n");
}
//else
{
if (packet.dest_addr == our_address)
{
// Process the packet
printdbg("That packet for me, so I'll process it.\n");
struct message_t message = {
packet.exp_addr,
packet.data_size,
packet.data
};
printfdbg("%d\n", packet.exp_addr);
printfdbg("%d\n", packet.data_size);
for (size_t i = 0; i<packet.data_size; i++)
printfdbg("%c", ((char*) packet.data)[i]);
printfdbg("\n");
send_message(&list_subscription, &message);
// then delete it
}
else if (packet.exp_addr == our_address)
{
// Delete the packet
printdbg("That packet is mine, so I'll drop it.\n");
}
else
{
printdbg("Forwarding the packet\n");
// Keep the packet in the frame
memmove(current_write_position,
current_read_position,
packet.data_size
+ sizeof(packet.exp_addr)
+ sizeof(packet.dest_addr)
+ sizeof(packet.msg_size)
+ sizeof(packet.data_hash)
+ sizeof(packet.pkt_serial)
+ sizeof(packet.data_size));
current_write_position +=
packet.data_size
+ sizeof(packet.exp_addr)
+ sizeof(packet.dest_addr)
+ sizeof(packet.msg_size)
+ sizeof(packet.data_hash)
+ sizeof(packet.pkt_serial)
+ sizeof(packet.data_size);
}
current_read_position +=
packet.data_size
+ sizeof(packet.exp_addr)
+ sizeof(packet.dest_addr)
+ sizeof(packet.msg_size)
+ sizeof(packet.data_hash)
+ sizeof(packet.pkt_serial)
+ sizeof(packet.data_size);
}
free(packet.data);
}
// Second step: fill the trame with to-send packets
while (current_write_position - (void*) (frame->packets) < available_size - sizeof(uint16_t) - sizeof(uint8_t))
{
packet = build_next_packet(our_address, (void*) (frame->packets) + available_size - current_write_position);
if (packet.data == NULL) // Si on ne peut pas prendre de paquet, on s'arrête.
break;
if (packet.data_size == 0)
printdbg("\tpacket.data_size == 0 !!!\n");
void* raw_packet;
size_t packet_size = packet_to_raw(&packet, &raw_packet);
memcpy(current_write_position, raw_packet, packet_size);
current_write_position += packet_size;
free(raw_packet);
}
// Third step: fill frame fields
frame->size = current_write_position - (void*) (frame->packets) + sizeof(uint16_t) + sizeof(uint8_t);
frame->token = 1;
}
void fill_discovery_frame(void* output_raw, uint32_t address)
/* Fill a raw frame with a new discovery frame. */
{
*((uint16_t*) output_raw) = sizeof(uint16_t) + sizeof(uint8_t) + sizeof(uint32_t);
output_raw += sizeof(uint16_t);
*((uint8_t*) output_raw) = 0;
output_raw += sizeof(uint8_t);
*((uint32_t*) output_raw) = address;
}
struct layer1_parm_t { in_addr_t next_node_ip; uint16_t next_node_port, node_port; uint32_t node_address; };
void *layer1_frame(void *parm) {
struct layer1_parm_t *layer_parms = (struct layer1_parm_t*)parm;
/* socket UDP */
int udpsock = socket(PF_INET, SOCK_DGRAM, 0);
if(udpsock == -1) {
perror("socket");
exit(1);
}
/* définir les buffers d'émission/réception à 64k (moins la taille header UDP) */
const int max_frame_size = 65507;
setsockopt(udpsock, SOL_SOCKET, SO_SNDBUF, &max_frame_size, sizeof(int));
setsockopt(udpsock, SOL_SOCKET, SO_RCVBUF, &max_frame_size, sizeof(int));
{
struct timeval tv;
tv.tv_sec = 1;
tv.tv_usec = 0;
setsockopt(udpsock, SOL_SOCKET, SO_RCVTIMEO, (char*) &tv, sizeof(tv));
}
/* adresses "physiques" UDP */
struct sockaddr_in ip_next = {0}, ip_prev = {0};
ip_next.sin_family = ip_prev.sin_family = AF_INET; /* IPv4 */
ip_next.sin_addr.s_addr = layer_parms->next_node_ip; /* adresse du nœud suivant */
ip_next.sin_port = layer_parms->next_node_port; /* port d’écoute du nœud suivant */
ip_prev.sin_addr.s_addr = htonl(INADDR_ANY); /* adresse du nœud précédent (indéterminée) */
ip_prev.sin_port = layer_parms->node_port; /* notre port d’écoute */
/* écouter sur le socket pour le nœud précédent */
if(bind(udpsock, (struct sockaddr*)(&ip_prev), sizeof(struct sockaddr)) == -1) {
/* si déjà pris */
perror("bind");
exit(1);
}
char rawframe[max_frame_size];
enum { quit, discovery, ring } mode = discovery;
// lancer le thread watchdog en lui fournissant &mode(+mutex?) et un mutex ?
do {
/* on attend de recevoir une trame */
ssize_t recvframe_size = recvfrom(udpsock, rawframe, max_frame_size, 0, NULL, NULL);
if(recvframe_size == -1 )
{
if (errno != EAGAIN && errno != EWOULDBLOCK)
{
perror("recvfrom");
break;
}
/* On a perdu la trame => mode découverte.
On attend un temps aléatoire, puis on envoie une trame de découverte. */
printdbg("Timeout: discovery mode\n");
mode = discovery;
printdbg("Temps aléatoire...\n");
usleep(rand() % 1000000);
printdbg("Fin temps aléatoire\n");
fill_discovery_frame(rawframe, layer_parms->node_address);
/* on envoie la trame */
if(sendto(udpsock, rawframe, sizeof(uint16_t) + sizeof(uint8_t) + sizeof(uint32_t), 0, (struct sockaddr*)&ip_next, sizeof(struct sockaddr)) == -1)
perror("sendto");
printdbg("Discovery frame sent\n");
}
else {
/* on a obtenu la trame */
size_t read_pos = 0;
struct frame_t frame;
frame.size = ntohs( *((uint16_t*)(rawframe+read_pos)) );
read_pos += sizeof(uint16_t);
frame.token = *((uint8_t*)(rawframe+read_pos));
read_pos += sizeof(uint8_t);
frame.packets = (void*) rawframe + read_pos;
/* Si on reçoit une trame trop courte pour recevoir un jeton, on jette la trame et on passe en mode découverte. */
if (recvframe_size < sizeof(frame.size) + sizeof(frame.token))
{
printdbg("Received too short frame: discovery mode\n");
mode = discovery;
continue;
}
if (!frame.token) // Si c'est une trame de découverte
{
printdbg("Received discovery frame\n");
if (mode == ring) // Les trames de découverte sont jetées si on est en mode ring.
{
printdbg("Dropped discovery frame (we are in token mode)\n");
continue;
}
uint32_t addr = *((uint32_t*)(rawframe+read_pos)); /* on extrait l'adresse de la trame de découverte */
read_pos += sizeof(uint32_t);
if (addr == layer_parms->node_address) // Si l'adresse contenue dans la trame de découverte est la notre
{
printdbg("Oh my god! This discovery frame is mine!\n");
/* On a reçu notre propre trame de découverte, elle a donc fait le tour de l'anneau.
On passe en mode anneau et on envoie la première trame à jeton. */
frame.token = 1;
frame.size = sizeof(frame.size) + sizeof(frame.token);
}
else
{
/* On doit juste renvoyer la trame. Ce bloc « else » est peut-être destiné à disparaître. */
printdbg("Forwarding discovery frame\n");
fprintf(stderr, "%u\t%u\n", addr, layer_parms->node_address);
}
}
if (frame.token) // Si on a reçu une trame à jeton
{
printdbg("Received token frame\n");
/* Traitons-la ! */
mode = ring;
process_token_frame(&frame, layer_parms->node_address, max_frame_size - sizeof(frame.token) - sizeof(frame.size));
}
/* On remet la frame dans rawframe */
read_pos = 0;
*((uint16_t*)(rawframe+read_pos)) = htons(frame.size);
read_pos += sizeof(uint16_t);
*((uint8_t*)(rawframe+read_pos)) = frame.token;
read_pos += sizeof(uint8_t);
memcpy((void*) (frame.packets), (void*) (rawframe + read_pos), frame.size);
/* on renvoie la trame */
if(sendto(udpsock, rawframe, frame.size, 0, (struct sockaddr*)&ip_next, sizeof(struct sockaddr)) == -1)
perror("sendto");
usleep(100000);
}
} while(mode != quit);
/* fermeture socket */
close(udpsock);
return NULL;
}
int is_ushort(char *str) {
char *error = str;
long int val = strtol(str, &error, 10);
return ( error != str && val > 0 && val < 65536 );
}
int is_uint32(char *str) {
char *error = str;
long int val = strtol(str, &error, 10);
//#warning dépassement de l’int
return ( error != str && val > 0 && val < 4294967296 );
}
int main(int argc, char **argv) {
uint32_t id;
if (argc < 6 || !is_ushort(argv[2]) || !is_ushort(argv[3]) || !char_hexa_to_uint32(argv[4], &id) || !is_ushort(argv[5])) {
puts("parms: <next node IP> <next node port> <our node port> <our ID> <subscribe port> ");
return 1;
}
/* threads de la couche 1, transmission de la trame */
struct layer1_parm_t layer1_parm;
layer1_parm.next_node_ip = inet_addr(argv[1]);
layer1_parm.next_node_port = htons(atoi(argv[2]));
layer1_parm.node_port = htons(atoi(argv[3]));
layer1_parm.node_address = id;
printfdbg("Subscribe port: %d\n", atoi(argv[5]));
/* Initialisation de la liste d'inscriptions */
list_subscription_init(&list_subscription, atoi(argv[5]));
pthread_t layer1_thread; /* réception/réémission trame */
pthread_create(&layer1_thread, NULL, layer1_frame, &layer1_parm);
/* fin du programme */
pthread_join(layer1_thread, NULL);
/* Desctruction de la liste d'inscriptions */
list_subscription_destroy(&list_subscription);
return 0;
}