You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

470 lines
15KB

  1. #include <inttypes.h>
  2. #include <sys/socket.h>
  3. #include <netinet/in.h>
  4. #include <arpa/inet.h>
  5. #include <stdio.h>
  6. #include <pthread.h>
  7. #include <stdlib.h>
  8. #include <unistd.h>
  9. #include <errno.h>
  10. #include <unistd.h>
  11. #include "interface.h"
  12. #ifdef debug
  13. #define printdbg(A) fputs(A, stderr)
  14. #define printfdbg(...) fprintf(stderr, __VA_ARGS__)
  15. #else
  16. #define printdbg(A) do {} while (0)
  17. #define printfdbg(...) do {} while (0)
  18. #endif
  19. struct packet_t {
  20. uint32_t exp_addr;
  21. uint32_t dest_addr;
  22. uint32_t msg_size;
  23. uint32_t data_hash;
  24. uint16_t pkt_serial;
  25. uint16_t data_size;
  26. void *data;
  27. };
  28. struct frame_t {
  29. uint16_t size;
  30. uint8_t token;
  31. struct packet_t *packets;
  32. };
  33. uint32_t checksum(char* data, size_t size) {
  34. uint32_t sum = 0;
  35. size_t i=0;
  36. for(; i+sizeof(uint32_t) < size; i += sizeof(uint32_t))
  37. sum ^= *((uint32_t*) data+i);
  38. char tmp[sizeof(uint32_t)] = {0};
  39. for (size_t j=0; j<size-i; j++)
  40. tmp[j] = data[i+j];
  41. sum ^= *((uint32_t*) tmp);
  42. return sum;
  43. }
  44. struct list_subscription_t list_subscription;
  45. struct packet_t build_next_packet(uint32_t exp_addr, uint16_t max_size)
  46. // Return a zero'ed packet if there is no packet to send.
  47. {
  48. static struct message_t current_message = {0,0,NULL};
  49. static size_t current_message_offset = 0;
  50. static uint16_t current_serial_number = 0;
  51. if (max_size < sizeof(4 * sizeof(uint32_t) + 2 * sizeof(uint16_t)))
  52. {
  53. printdbg("Not enough space available to build a packet. Abort.\n");
  54. return (struct packet_t) {0};
  55. }
  56. if (current_message.length == current_message_offset)
  57. {
  58. free(current_message.data_p);
  59. current_message = (struct message_t) {0};
  60. current_message_offset = 0;
  61. current_serial_number = 0;
  62. if(read_message(&list_subscription, &current_message) <= 0)
  63. {
  64. printdbg("No message to pick from the message queue.\n");
  65. return (struct packet_t) {0};
  66. }
  67. }
  68. uint16_t size = (current_message.length - current_message_offset < max_size) ?
  69. (current_message.length - current_message_offset) :
  70. max_size;
  71. struct packet_t packet = {
  72. exp_addr,
  73. current_message.addr,
  74. current_message.length,
  75. checksum(current_message.data_p + current_message_offset, size),
  76. current_serial_number ++,
  77. size,
  78. current_message.data_p + current_message_offset
  79. };
  80. current_message_offset += size;
  81. return packet;
  82. }
  83. size_t packet_to_raw(const struct packet_t* packet, void** output)
  84. /* Returns the number of bytes of the new raw message.
  85. Returns 0 on an error.
  86. Remember to free *output. */
  87. {
  88. size_t size = 4*sizeof(uint32_t) + 2*sizeof(uint16_t) + packet->data_size;
  89. void* tmp = *output = malloc(4*sizeof(uint32_t) + 2*sizeof(uint16_t) + packet->data_size);
  90. if (tmp == NULL) // Unable to malloc
  91. return 0;
  92. //#warning Ordre valide, selon la spec ?
  93. *((uint32_t*) tmp) = htonl(packet->exp_addr);
  94. tmp += sizeof(uint32_t);
  95. *((uint32_t*) tmp) = htonl(packet->dest_addr);
  96. tmp += sizeof(uint32_t);
  97. *((uint32_t*) tmp) = htonl(packet->msg_size);
  98. tmp += sizeof(uint32_t);
  99. *((uint32_t*) tmp) = packet->data_hash;
  100. tmp += sizeof(uint32_t);
  101. *((uint16_t*) tmp) = htons(packet->pkt_serial);
  102. tmp += sizeof(uint16_t);
  103. *((uint16_t*) tmp) = htons(packet->data_size);
  104. tmp += sizeof(uint16_t);
  105. memcpy(tmp, packet->data, packet->data_size);
  106. return size;
  107. }
  108. int packet_of_raw(const void* raw_packet, struct packet_t* output_packet)
  109. /* Returns 0 on success.
  110. Remember to free output_packet->data. */
  111. {
  112. const void* tmp = raw_packet;
  113. //#warning Ordre valide, selon la spec ?
  114. output_packet->exp_addr = ntohl(*((uint32_t*) tmp));
  115. tmp += sizeof(uint32_t);
  116. output_packet->dest_addr = ntohl(*((uint32_t*) tmp));
  117. tmp += sizeof(uint32_t);
  118. output_packet->msg_size = ntohl(*((uint32_t*) tmp));
  119. tmp += sizeof(uint32_t);
  120. output_packet->data_hash = *((uint32_t*) tmp);
  121. tmp += sizeof(uint32_t);
  122. output_packet->pkt_serial = ntohs(*((uint16_t*) tmp));
  123. tmp += sizeof(uint16_t);
  124. output_packet->data_size = ntohs(*((uint16_t*) tmp));
  125. tmp += sizeof(uint16_t);
  126. if (output_packet->data_size == 0)
  127. output_packet->data = NULL;
  128. else if ((output_packet->data = malloc(output_packet->data_size)) == NULL)
  129. return 1;
  130. memcpy(output_packet->data, tmp, output_packet->data_size);
  131. return 0;
  132. }
  133. void process_token_frame(struct frame_t* frame, uint32_t our_address, const unsigned int available_size)
  134. /* Does not check *frame is really a raw token frame. Assumes the user won't be stupid by passing a discovery frame. */
  135. {
  136. struct packet_t packet;
  137. int count = 0;
  138. void* current_read_position = (void*) frame->packets;
  139. void* current_write_position = current_read_position;
  140. // First step: read all packets
  141. while (current_read_position - (void*) (frame->packets) < frame->size - sizeof(uint16_t) - sizeof(uint8_t))
  142. {
  143. count ++;
  144. printfdbg("%p\t%p\t%d\n", current_read_position, current_write_position, count);
  145. if (packet_of_raw(current_read_position, &packet) != 0)
  146. {
  147. //#warning Maybe change current_read_position?
  148. printdbg("Error: process_raw_token_frame: packet_of_raw returned an error.");
  149. continue;
  150. }
  151. if (packet.data_hash != checksum(packet.data, packet.data_size))
  152. {
  153. //#warning Maybe change current_read_position? (again)
  154. printdbg("Error: invalid checksum. Deleting the packet.\n");
  155. }
  156. //else
  157. {
  158. if (packet.dest_addr == our_address)
  159. {
  160. // Process the packet
  161. printdbg("That packet for me, so I'll process it.\n");
  162. struct message_t message = {
  163. packet.exp_addr,
  164. packet.data_size,
  165. packet.data
  166. };
  167. printfdbg("%d\n", packet.exp_addr);
  168. printfdbg("%d\n", packet.data_size);
  169. for (size_t i = 0; i<packet.data_size; i++)
  170. printfdbg("%c", ((char*) packet.data)[i]);
  171. printfdbg("\n");
  172. send_message(&list_subscription, &message);
  173. // then delete it
  174. }
  175. else if (packet.exp_addr == our_address)
  176. {
  177. // Delete the packet
  178. printdbg("That packet is mine, so I'll drop it.\n");
  179. }
  180. else
  181. {
  182. printdbg("Forwarding the packet\n");
  183. // Keep the packet in the frame
  184. memmove(current_write_position,
  185. current_read_position,
  186. packet.data_size
  187. + sizeof(packet.exp_addr)
  188. + sizeof(packet.dest_addr)
  189. + sizeof(packet.msg_size)
  190. + sizeof(packet.data_hash)
  191. + sizeof(packet.pkt_serial)
  192. + sizeof(packet.data_size));
  193. current_write_position +=
  194. packet.data_size
  195. + sizeof(packet.exp_addr)
  196. + sizeof(packet.dest_addr)
  197. + sizeof(packet.msg_size)
  198. + sizeof(packet.data_hash)
  199. + sizeof(packet.pkt_serial)
  200. + sizeof(packet.data_size);
  201. }
  202. current_read_position +=
  203. packet.data_size
  204. + sizeof(packet.exp_addr)
  205. + sizeof(packet.dest_addr)
  206. + sizeof(packet.msg_size)
  207. + sizeof(packet.data_hash)
  208. + sizeof(packet.pkt_serial)
  209. + sizeof(packet.data_size);
  210. }
  211. free(packet.data);
  212. }
  213. // Second step: fill the trame with to-send packets
  214. while (current_write_position - (void*) (frame->packets) < available_size - sizeof(uint16_t) - sizeof(uint8_t))
  215. {
  216. packet = build_next_packet(our_address, (void*) (frame->packets) + available_size - current_write_position);
  217. if (packet.data == NULL) // Si on ne peut pas prendre de paquet, on s'arrête.
  218. break;
  219. if (packet.data_size == 0)
  220. printdbg("\tpacket.data_size == 0 !!!\n");
  221. void* raw_packet;
  222. size_t packet_size = packet_to_raw(&packet, &raw_packet);
  223. memcpy(current_write_position, raw_packet, packet_size);
  224. current_write_position += packet_size;
  225. free(raw_packet);
  226. }
  227. // Third step: fill frame fields
  228. frame->size = current_write_position - (void*) (frame->packets) + sizeof(uint16_t) + sizeof(uint8_t);
  229. frame->token = 1;
  230. }
  231. void fill_discovery_frame(void* output_raw, uint32_t address)
  232. /* Fill a raw frame with a new discovery frame. */
  233. {
  234. *((uint16_t*) output_raw) = sizeof(uint16_t) + sizeof(uint8_t) + sizeof(uint32_t);
  235. output_raw += sizeof(uint16_t);
  236. *((uint8_t*) output_raw) = 0;
  237. output_raw += sizeof(uint8_t);
  238. *((uint32_t*) output_raw) = address;
  239. }
  240. struct layer1_parm_t { in_addr_t next_node_ip; uint16_t next_node_port, node_port; uint32_t node_address; };
  241. void *layer1_frame(void *parm) {
  242. struct layer1_parm_t *layer_parms = (struct layer1_parm_t*)parm;
  243. /* socket UDP */
  244. int udpsock = socket(PF_INET, SOCK_DGRAM, 0);
  245. if(udpsock == -1) {
  246. perror("socket");
  247. exit(1);
  248. }
  249. /* définir les buffers d'émission/réception à 64k (moins la taille header UDP) */
  250. const int max_frame_size = 65507;
  251. setsockopt(udpsock, SOL_SOCKET, SO_SNDBUF, &max_frame_size, sizeof(int));
  252. setsockopt(udpsock, SOL_SOCKET, SO_RCVBUF, &max_frame_size, sizeof(int));
  253. {
  254. struct timeval tv;
  255. tv.tv_sec = 1;
  256. tv.tv_usec = 0;
  257. setsockopt(udpsock, SOL_SOCKET, SO_RCVTIMEO, (char*) &tv, sizeof(tv));
  258. }
  259. /* adresses "physiques" UDP */
  260. struct sockaddr_in ip_next = {0}, ip_prev = {0};
  261. ip_next.sin_family = ip_prev.sin_family = AF_INET; /* IPv4 */
  262. ip_next.sin_addr.s_addr = layer_parms->next_node_ip; /* adresse du nœud suivant */
  263. ip_next.sin_port = layer_parms->next_node_port; /* port d’écoute du nœud suivant */
  264. ip_prev.sin_addr.s_addr = htonl(INADDR_ANY); /* adresse du nœud précédent (indéterminée) */
  265. ip_prev.sin_port = layer_parms->node_port; /* notre port d’écoute */
  266. /* écouter sur le socket pour le nœud précédent */
  267. if(bind(udpsock, (struct sockaddr*)(&ip_prev), sizeof(struct sockaddr)) == -1) {
  268. /* si déjà pris */
  269. perror("bind");
  270. exit(1);
  271. }
  272. char rawframe[max_frame_size];
  273. enum { quit, discovery, ring } mode = discovery;
  274. // lancer le thread watchdog en lui fournissant &mode(+mutex?) et un mutex ?
  275. do {
  276. /* on attend de recevoir une trame */
  277. ssize_t recvframe_size = recvfrom(udpsock, rawframe, max_frame_size, 0, NULL, NULL);
  278. if(recvframe_size == -1 )
  279. {
  280. if (errno != EAGAIN && errno != EWOULDBLOCK)
  281. {
  282. perror("recvfrom");
  283. break;
  284. }
  285. /* On a perdu la trame => mode découverte.
  286. On attend un temps aléatoire, puis on envoie une trame de découverte. */
  287. printdbg("Timeout: discovery mode\n");
  288. mode = discovery;
  289. printdbg("Temps aléatoire...\n");
  290. usleep(rand() % 1000000);
  291. printdbg("Fin temps aléatoire\n");
  292. fill_discovery_frame(rawframe, layer_parms->node_address);
  293. /* on envoie la trame */
  294. if(sendto(udpsock, rawframe, sizeof(uint16_t) + sizeof(uint8_t) + sizeof(uint32_t), 0, (struct sockaddr*)&ip_next, sizeof(struct sockaddr)) == -1)
  295. perror("sendto");
  296. printdbg("Discovery frame sent\n");
  297. }
  298. else {
  299. /* on a obtenu la trame */
  300. size_t read_pos = 0;
  301. struct frame_t frame;
  302. frame.size = ntohs( *((uint16_t*)(rawframe+read_pos)) );
  303. read_pos += sizeof(uint16_t);
  304. frame.token = *((uint8_t*)(rawframe+read_pos));
  305. read_pos += sizeof(uint8_t);
  306. frame.packets = (void*) rawframe + read_pos;
  307. /* Si on reçoit une trame trop courte pour recevoir un jeton, on jette la trame et on passe en mode découverte. */
  308. if (recvframe_size < sizeof(frame.size) + sizeof(frame.token))
  309. {
  310. printdbg("Received too short frame: discovery mode\n");
  311. mode = discovery;
  312. continue;
  313. }
  314. if (!frame.token) // Si c'est une trame de découverte
  315. {
  316. printdbg("Received discovery frame\n");
  317. if (mode == ring) // Les trames de découverte sont jetées si on est en mode ring.
  318. {
  319. printdbg("Dropped discovery frame (we are in token mode)\n");
  320. continue;
  321. }
  322. uint32_t addr = *((uint32_t*)(rawframe+read_pos)); /* on extrait l'adresse de la trame de découverte */
  323. read_pos += sizeof(uint32_t);
  324. if (addr == layer_parms->node_address) // Si l'adresse contenue dans la trame de découverte est la notre
  325. {
  326. printdbg("Oh my god! This discovery frame is mine!\n");
  327. /* On a reçu notre propre trame de découverte, elle a donc fait le tour de l'anneau.
  328. On passe en mode anneau et on envoie la première trame à jeton. */
  329. frame.token = 1;
  330. frame.size = sizeof(frame.size) + sizeof(frame.token);
  331. }
  332. else
  333. {
  334. /* On doit juste renvoyer la trame. Ce bloc « else » est peut-être destiné à disparaître. */
  335. printdbg("Forwarding discovery frame\n");
  336. fprintf(stderr, "%u\t%u\n", addr, layer_parms->node_address);
  337. }
  338. }
  339. if (frame.token) // Si on a reçu une trame à jeton
  340. {
  341. printdbg("Received token frame\n");
  342. /* Traitons-la ! */
  343. mode = ring;
  344. process_token_frame(&frame, layer_parms->node_address, max_frame_size - sizeof(frame.token) - sizeof(frame.size));
  345. }
  346. /* On remet la frame dans rawframe */
  347. read_pos = 0;
  348. *((uint16_t*)(rawframe+read_pos)) = htons(frame.size);
  349. read_pos += sizeof(uint16_t);
  350. *((uint8_t*)(rawframe+read_pos)) = frame.token;
  351. read_pos += sizeof(uint8_t);
  352. memcpy((void*) (frame.packets), (void*) (rawframe + read_pos), frame.size);
  353. /* on renvoie la trame */
  354. if(sendto(udpsock, rawframe, frame.size, 0, (struct sockaddr*)&ip_next, sizeof(struct sockaddr)) == -1)
  355. perror("sendto");
  356. usleep(100000);
  357. }
  358. } while(mode != quit);
  359. /* fermeture socket */
  360. close(udpsock);
  361. return NULL;
  362. }
  363. int is_ushort(char *str) {
  364. char *error = str;
  365. long int val = strtol(str, &error, 10);
  366. return ( error != str && val > 0 && val < 65536 );
  367. }
  368. int is_uint32(char *str) {
  369. char *error = str;
  370. long int val = strtol(str, &error, 10);
  371. //#warning dépassement de l’int
  372. return ( error != str && val > 0 && val < 4294967296 );
  373. }
  374. int main(int argc, char **argv) {
  375. uint32_t id;
  376. if (argc < 6 || !is_ushort(argv[2]) || !is_ushort(argv[3]) || !char_hexa_to_uint32(argv[4], &id) || !is_ushort(argv[5])) {
  377. puts("parms: <next node IP> <next node port> <our node port> <our ID> <subscribe port> ");
  378. return 1;
  379. }
  380. /* threads de la couche 1, transmission de la trame */
  381. struct layer1_parm_t layer1_parm;
  382. layer1_parm.next_node_ip = inet_addr(argv[1]);
  383. layer1_parm.next_node_port = htons(atoi(argv[2]));
  384. layer1_parm.node_port = htons(atoi(argv[3]));
  385. layer1_parm.node_address = id;
  386. printfdbg("Subscribe port: %d\n", atoi(argv[5]));
  387. /* Initialisation de la liste d'inscriptions */
  388. list_subscription_init(&list_subscription, atoi(argv[5]));
  389. pthread_t layer1_thread; /* réception/réémission trame */
  390. pthread_create(&layer1_thread, NULL, layer1_frame, &layer1_parm);
  391. /* fin du programme */
  392. pthread_join(layer1_thread, NULL);
  393. /* Desctruction de la liste d'inscriptions */
  394. list_subscription_destroy(&list_subscription);
  395. return 0;
  396. }