// This is the main file of the Dazibao project. It represents the node, and // handles all of the main logic, including the network connexions. #include #include #include #include #include #include #include #include #include #include #include #include #include "node.h" // Static variables static list *data_list; static list *neighbour_list; /* ---- Fonctions utilitaires ---- */ // Get list length int len_list(list *l) { int len = 0; list *tmp = l; while(tmp != NULL) { tmp = tmp->next; len++; } return len; } // Get a random neighbour neighbour_peer *get_random_neighbour() { // Get a random number time_t t; srand((unsigned) time(&t)); int n = rand() % len_list(neighbour_list); // Get nth neighbour list *tmp = neighbour_list; for(int i=0; inext; } return (neighbour_peer*) tmp->data; } // get data associated with id, if it doesn't exist return NULL pub_data *get_data(int64_t id) { list *tmp = data_list; pub_data *data; while(tmp != NULL) { data = (pub_data*) tmp->data; if(data->id == id) return data; } return NULL; } // Take data as args and create a pub_data structure in the heap pub_data *copy_data(unsigned char len, int64_t id, int16_t seqno, char *data) { pub_data *new_data = (pub_data*) malloc(sizeof(pub_data)); char *_data = (char*) malloc(len); new_data->length = len; new_data->id = id; new_data->seqno = seqno; new_data->data = _data; memcpy(_data, data, len); return new_data; } // Add new data to data list void add_data(unsigned char len, int64_t id, int16_t seqno, char *data) { // If id is the same as this node's id then we only update seqno if(id == NODE_ID) { pub_data *node_data = get_data(NODE_ID); if(seqno >= node_data->seqno) { node_data->seqno = seqno ^ 1; } return; } // Copy data pub_data *new_data = copy_data(len, id, seqno, data); if(data_list == NULL) { // Update list data_list = (list*) malloc(sizeof(list)); data_list->data = (void*) new_data; data_list->next = NULL; return; } // Find correct position for new data list *tmp = data_list; list *last = NULL; list *new_node; int64_t cur_id; while(tmp != NULL) { cur_id = ((pub_data*) tmp->data)->id; // If id is smaller than cur_id then the new data has to be added at this position if(id < cur_id) { // If last hasn't been set then the new data becomes the head of the list if(last == NULL) { // Update list data_list = (list*) malloc(sizeof(list)); data_list->data = (void*) new_data; data_list->next = tmp; return; } // Else, we update the last node new_node = (list*) malloc(sizeof(list)); new_node->data = (void*) new_data; new_node->next = tmp; last->next = new_node; return; } else if(id == cur_id) { // If data already exists for this id then we update it if it's seqno is greater than the one stored pub_data *cur_data = (pub_data*) tmp->data; if(seqno > cur_data->seqno) { // Updata data tmp->data = (void*) new_data; // Free old data free(cur_data); return; } // seqno is smaller so the new data allocated is freed and nothing else is done free(new_data); return; } // Get next node in list last = tmp; tmp = tmp->next; } // If no correct position was found then the new data has to be added at the end of the list // Update list new_node = (list*) malloc(sizeof(list)); new_node->data = (void*) new_data; new_node->next = NULL; last->next = new_node; } /* ---- Fin fonctions utilitaires ---- */ // Add TLV to packet, if it does not fit then send the packet and reset the packet buff to be able to add more TLVs that will be sent afterwards int add_tlv(packet *pack, tlv *tlv, struct sockaddr_in6 *dest, int socket_num) { char type = tlv->pad1->type, sent = 0, errval = 0; unsigned char len; // Check if TLV fits in the packet, if not then send the packet and reset it if(type != 1) { len = tlv->padn->length + 2; if(pack->length + len > 1020) { errval = send_packet((char*) pack, pack->length, dest, socket_num); *pack = (packet) {.magic = 95, .version = 1, .length = 0}; memset(pack->body, 0, 1020); sent = 1; } } else { if(pack->length >= 1020) { errval = send_packet((char*) pack, pack->length, dest, socket_num); *pack = (packet) {.magic = 95, .version = 1, .length = 0}; memset(pack->body, 0, 1020); sent = 1; } } // Copy data from tlv into body switch(type) { case 1: memcpy(pack->body + pack->length, tlv->pad1, 1); pack->length += 1; break; case 2: memcpy(pack->body + pack->length, tlv->padn, len); pack->length += len; break; case 3: memcpy(pack->body + pack->length, tlv->neighbour, len); pack->length += len; break; case 4: memcpy(pack->body + pack->length, tlv->network_hash, len); pack->length += len; break; case 5: memcpy(pack->body + pack->length, tlv->network_state_req, len); pack->length += len; break; case 6: memcpy(pack->body + pack->length, tlv->node_hash, len); pack->length += len; break; case 7: memcpy(pack->body + pack->length, tlv->node_state_req, len); pack->length += len; break; case 8: memcpy(pack->body + pack->length, tlv->node_state, len); pack->length += len; break; case 9: memcpy(pack->body + pack->length, tlv->warning, len); pack->length += len; break; default: return -1; } // If the previous packet was went return 1 or -1 if there was an error sending it if(sent) return errval? -1:1; // Return 0 if the TLV was added to the packet return 0; } // Send length bytes from packet int send_packet(char *packet_buff, int16_t length, struct sockaddr_in6 *dest, int socket_num) { // Vectorized buffer struct iovec vec_buff = {.iov_len = length, .iov_base = packet_buff}; int error_while_sending = 0; // Creating the struct to send out with sendmsg struct msghdr packet_tlv_send_out = { .msg_name = dest, .msg_namelen = sizeof(struct sockaddr_in6), .msg_iov = &vec_buff, .msg_iovlen = 1 // We have only one iovec buffer. But if we had 2, we would write 2. }; int response_code = sendmsg(socket_num, &packet_tlv_send_out, 0); if (response_code < 0) { // debug_print("Unable to send out the packet to peer %i", i); error_while_sending = 1; } else if (response_code < length) { // debug_print("Sent out only part of the packet."); error_while_sending = 1; } else { // debug_print("Send out packet to peer %i", i); } if (error_while_sending == 1) { // debug_print("Error occured while sending out a packet."); return -1; } else { return 0; } } // Send a single TLV to the specified addresses, return -1 if an error was encountered, 0 otherwise int send_single_tlv(tlv *tlv, struct sockaddr_in6 *dest, int socket_num) { char type = tlv->pad1->type; unsigned char len; packet pack = (packet) {.magic = 95, .version = 1, .length = 4}; memset(pack.body, 0, 1020); // Copy data from tlv into body switch(type) { case 1: memcpy(pack.body, tlv->pad1, 1); pack.length += 1; break; case 2: len = tlv->padn->length + 2; memcpy(pack.body, tlv->padn, len); pack.length += len; break; case 3: len = tlv->neighbour->length + 2; memcpy(pack.body, tlv->neighbour, len); pack.length += len; break; case 4: len = tlv->network_hash->length + 2; memcpy(pack.body, tlv->network_hash, len); pack.length += len; break; case 5: len = tlv->network_state_req->length + 2; memcpy(pack.body, tlv->network_state_req, len); pack.length += len; break; case 6: len = tlv->node_hash->length + 2; memcpy(pack.body, tlv->node_hash, len); pack.length += len; break; case 7: len = tlv->node_state_req->length + 2; memcpy(pack.body, tlv->node_state_req, len); pack.length += len; break; case 8: len = tlv->node_state->length + 2; memcpy(pack.body, tlv->node_state, len); pack.length += len; break; case 9: len = tlv->warning->length + 2; memcpy(pack.body, tlv->warning, len); pack.length += len; break; default: return -1; } // Send the packet return send_packet((char*) &pack, pack.length, dest, socket_num); } int send_tlv(tlv *tlv_to_send, int16_t tlv_size, struct sockaddr_in6 * dest_list, int dest_list_size, int socket_num){ // debug_print("Building packet to send a TLV."); // We first need to build the packet, char packet_buff[1024]; struct packet pack; pack.magic = 95; pack.version = 1; if (tlv_size > 1020) { perror(">> Unable to send the tlv, it's size if above 1020 bytes."); return -1; } else { memcpy((void *) pack.body, tlv_to_send, tlv_size); } // Move the content of the paquet struct to a buffer // That will be send out in a vectorized buffer. // packet_buff = (char *) pack; memcpy(&packet_buff,&pack,1024); // debug_print("Packet has been built."); // Vectorized buffer struct iovec vec_buff = { .iov_len = sizeof(packet_buff), .iov_base = packet_buff }; int error_while_sending = 0; // For every dest for (size_t i = 0; i < dest_list_size; i++) { // Creating the struct to send out with sendmsg struct msghdr packet_tlv_send_out = { .msg_name = &dest_list[i], .msg_namelen = sizeof(dest_list[i]), .msg_iov = &vec_buff, .msg_iovlen = 1 // We have only one iovec buffer. But if we had 2, we would write 2. }; int response_code = sendmsg((int) socket_num, &packet_tlv_send_out, 0); if (response_code < 0) { // debug_print("Unable to send out the packet to peer %i", i); error_while_sending = 1; continue; } else if (response_code < sizeof(packet_tlv_send_out)) { // debug_print("Sent out only part of the packet."); error_while_sending = 1; continue; } else { // debug_print("Send out packet to peer %i", i); } } if (error_while_sending == 1) { // debug_print("Error occured while sending out a packet."); return -1; } else { return 0; } } // We need to make sure the TLV announces a length that will no go onto // another tlv, as we might end up reading bullshit. int validate_tlv(char *data, int pos, int16_t packet_len){ char type = data[pos]; // Nothing to do in this case if(type == 0) return 0; // Check that we can read a length if(pos + 1 >= packet_len) return -1; unsigned char tlv_len = data[pos+1]; // Check that the tlv does not exceed the packet length if(pos + tlv_len >= packet_len) return -1; // Returns the type of the tlv or -1 if something went wrong switch(type) { case 1: return 1; case 2: if(tlv_len != LEN_NEIGHBOUR_REQ) return -1; return 2; case 3: if(tlv_len != LEN_NEIGHBOUR) return -1; return 3; case 4: if(tlv_len != LEN_NETWORK_HASH) return -1; return 4; case 5: if(tlv_len != LEN_NETWORK_STATE_REQ) return -1; return 5; case 6: if(tlv_len != LEN_NODE_HASH) return -1; return 6; case 7: if(tlv_len != LEN_NODE_STATE_REQ) return -1; return 7; case 8: if(tlv_len < MIN_LEN_NODE_STATE || tlv_len > MAX_LEN_NODE_STATE) return -1; return 8; case 9: return 9; default: return -1; } } // For every packet recivied, // then we make sure it's conform // We then extract the data from it to make it easy to work with int check_header(char * received_data_buffer[], int received_data_len, struct packet * packet_to_return){ packet_to_return = (packet*) received_data_buffer; // We need to check a few things ; // The first byte must be worth 95, if (packet_to_return->magic != 95) { perror(">> The magic number of the packet is no good."); return -1; } // The second byte must be worth 1, if (packet_to_return->version != 1) { perror(">> The version number of the packet is no good."); return -1; } if (packet_to_return->length + 4 > received_data_len ) { perror(">> The packet length is bigger than the UDP datagram, which is not possible with the current laws of physics."); return -1; } return 0; } // If the sender is not in the neighbourhood, and we have 15 neighbours, we // ignore the packet. Otherwise, we add him to the neighbourhood, marked as // temporary. int update_neighbours(){ return 0; }; int add_message(char * message, int message_len){ return 0; } // We then look at the differents TLVs in the packet. int work_with_tlvs(char * data, int16_t packet_len, struct sockaddr_in6 *sender, int socket_num){ int pos = 0; unsigned char tlv_len, hash[16]; char warn[32]; tlv new_tlv, cur_tlv; new_tlv.pad1 = NULL; cur_tlv.pad1 = NULL; list *tmp_list; pub_data *pdata; struct neighbour_peer *random_neighbour; struct sockaddr_in6 new_neighbour; packet pack = (packet) {.magic = 95, .version = 1, .length = 0}; memset(pack.body, 0, 1020); int ifindex = if_nametoindex("eth0"); if(ifindex == 0) { perror("if_nametoindex failed"); return -1; } while(pos < packet_len) { switch(validate_tlv(data, pos, packet_len)) { case 0: // We received a padding tlv so it is ignored pos += 1; break; case 1: // We received a padding tlv so it is ignored tlv_len = data[pos+1]; pos += tlv_len + 2; break; case 2: // We received a neighbour request so a random neighbor tlv has to be sent // Send a neighbour tlv random_neighbour = get_random_neighbour(); build_neighbour(&new_tlv, random_neighbour->ip, random_neighbour->port); add_tlv(&pack, &new_tlv, sender, socket_num); // The position is updated tlv_len = data[pos+1]; pos += tlv_len + 2; break; case 3: // We received a neighbour tlv so a tlv network hash is sent to that address cur_tlv.neighbour = (neighbour*) (data + pos); // Init dest socket memset(&new_neighbour, 0, sizeof(new_neighbour)); new_neighbour.sin6_family = AF_INET6; memcpy(&new_neighbour.sin6_addr, &cur_tlv.neighbour->ip, 16); new_neighbour.sin6_port = htons(LISTEN_PORT); new_neighbour.sin6_scope_id = ifindex; // Build network hash build_network_hash(&new_tlv, data_list); send_single_tlv(&new_tlv, &new_neighbour, socket_num); // The position is updated tlv_len = data[pos+1]; pos += tlv_len + 2; break; case 4: // We reveived a network hash tlv so we compare the hash with our own, if they differ we send a network state request tlv cur_tlv.network_hash = (network_hash*) (data + pos); hash_network(data_list, hash); if(memcmp(hash, cur_tlv.network_hash->network_hash, 16) == 0) { build_network_state_req(&new_tlv); add_tlv(&pack, &new_tlv, sender, socket_num); } // The position is updated tlv_len = data[pos+1]; pos += tlv_len + 2; break; case 5: // We received a network state request tlv so a series of tlv node hash have to be sent for each data known // for each known data build a node hash and add to packet tmp_list = data_list; while(tmp_list != NULL) { pdata = (pub_data*) tmp_list->data; build_node_hash(&new_tlv, pdata->id, pdata->seqno, pdata->data); add_tlv(&pack, &new_tlv, sender, socket_num); } // The position is updated pos += 2; break; case 6: // We received a node hash tlv so if there is no entry for node_id in the data list or the hashes differ we send a node state request, if the hashes are identical nothing has to be done cur_tlv.node_hash = (node_hash*) (data + pos); pdata = get_data(cur_tlv.node_hash->node_id); // If data is found for this id then we check that both hashes are the same if(pdata != NULL) { // We hash the data stored in the data list hash_data(pdata, hash); // If both hashes are the same then nothing has to be done if(memcmp(hash, cur_tlv.node_hash->node_hash, 16) != 0) { // The position is updated tlv_len = data[pos+1]; pos += 2; break; } } // If no pub_data was found or the hashes differ then we send a node state request build_node_state_req(&new_tlv, cur_tlv.node_hash->node_id); add_tlv(&pack, &new_tlv, sender, socket_num); // The position is updated tlv_len = data[pos+1]; pos += tlv_len + 2; break; case 7: // We received a node state request tlv so a node state tlv for this node id has to be sent, if no pub_data exists for this id nothing is sent cur_tlv.node_state_req = (node_state_req*) (data + pos); pdata = get_data(cur_tlv.node_state_req->node_id); if(pdata != NULL) { build_node_state(&new_tlv, pdata->id, pdata->seqno, pdata->data, pdata->length); add_tlv(&pack, &new_tlv, sender, socket_num); } // The position is updated tlv_len = data[pos+1]; pos += tlv_len + 2; break; case 8: // We received a node state tlv so we add it to the data list or update the data stored cur_tlv.node_state = (node_state*) (data + pos); add_data(cur_tlv.node_state->length - 26, cur_tlv.node_state->node_id, cur_tlv.node_state->seqno, cur_tlv.node_state->data); // The position is updated tlv_len = data[pos+1]; pos += tlv_len + 2; break; case 9: // We received a warning tlv so it's message is printed cur_tlv.warning = (warning*) (data + pos); // Print exactly new_tlv.length characters from new_tlv.message sprintf(warn, ">> WARNING:\n%%.%ds", cur_tlv.warning->length + 1); printf(warn, cur_tlv.warning->message); // The position is updated tlv_len = data[pos+1]; pos += tlv_len + 2; break; default: // A malformed packet was found so we stop looking for more packets and send a warning tlv strcpy(warn, "Packet is malformed."); build_warning(&new_tlv, warn, strlen(warn)); add_tlv(&pack, &new_tlv, sender, socket_num); return -1; } } // Free the previously allocated memory free(new_tlv.pad1); // If the packet still has data in it then send it if(pack.length > 0) send_packet((char*) &pack, pack.length, sender, socket_num); return 0; } int listen_for_packets(char * received_data_buffer[], int received_data_len, struct sockaddr_in6 * sender, int sock_fd){ // We verify the received packet is well formated, // and we return it in the struct designed to work with it. struct packet formated_rec_datagram; if(check_header(received_data_buffer, received_data_len, &formated_rec_datagram) < 0){ perror(">> Error while checking the header, aborting this packet, by choice, and conviction."); return -1; } // TODO : Add the neighbour check here. // struct tlv_list received_tlvs; // if (validate_tlvs(formated_rec_datagram) < 0) int nbr_success_tlv = work_with_tlvs(received_data_buffer, received_data_len, sender, sock_fd); if (nbr_success_tlv < 0){ perror(">> Error while treating the TLVs of the packet."); printf(">> Managed to deal with %i TLVs\n", -nbr_success_tlv ); return -2; } else { printf(">> Done working with the TLVs of the packet, listening for new packets.\n"); return 0; } } int t_ask_for_more_peers(){ return 0; } int t_get_network_state(){ return 0; } int t_update_neighbours(){ return 0; } int run_node(int sock_fd){ printf(">> Running node...\n"); int ret; ssize_t bytes; char input_buffer[1024]; char output_buffer[1024]; struct pollfd fds[2]; // Init the ~20s delay for node update. srand(time(NULL)); int delay = time(NULL) + 20; /* Descriptor zero is stdin */ fds[0].fd = 0; fds[1].fd = sock_fd; fds[0].events = POLLIN | POLLPRI; fds[1].events = POLLIN | POLLPRI; /* Normally we'd check an exit condition, but for this example * we loop endlessly. */ while (1) { if (time(NULL) >= delay) { printf(">> Asking for more peers...\n"); t_ask_for_more_peers(); printf(">> Updating neighbours...\n"); t_update_neighbours(); printf(">> Getting network state...\n"); t_get_network_state(); delay = time(NULL) + 20 + (rand() % 10); } // This might be cool to add, but we need to find a way to write to stdin // while it's running. // if (time(NULL) < delay) { // // Thanks to : // // https://gist.github.com/amullins83/24b5ef48657c08c4005a8fab837b7499 // printf("\b\x1b[2K\r>> Next request in %li seconds..", delay - time(NULL)); // fflush(stdout); // } // printf("\n"); /* Call poll() */ ret = poll(fds, 2, 5); if (ret < 0) { printf(">> Error - poll returned error: %s\n", strerror(errno)); break; } else if (ret > 0) { /* Regardless of requested events, poll() can always return these */ if (fds[0].revents & (POLLERR | POLLHUP | POLLNVAL)) { printf("Error - poll indicated stdin error\n"); break; } if (fds[1].revents & (POLLERR | POLLHUP | POLLNVAL)) { printf("Error - poll indicated socket error\n"); break; } // Read data from stdin (new message to post ) if (fds[0].revents & (POLLIN | POLLPRI)) { bytes = read(0, input_buffer, sizeof(input_buffer)); if (bytes < 0) { printf("Error - stdin error: %s\n", strerror(errno)); break; } input_buffer[strcspn(input_buffer, "\n")] = 0; printf(">> Adding following message to the table : ā€œ%sā€\n", input_buffer ); // Add message to the message table. if (add_message(&input_buffer, bytes) < 0) { perror(">> Error while trying to add the message to the list of messages, please try again.."); } } // Read data from the socket ( incoming packet ) if (fds[1].revents & (POLLIN | POLLPRI)) { // Vectorized buffer struct iovec vec_buff_rec = { .iov_len = sizeof(output_buffer), .iov_base = output_buffer }; struct sockaddr_in6 sender; // Creating the struct receive the server reponse. // Is empty, will be filled by recvmsg() struct msghdr msg_from_peer = { .msg_name = &sender, .msg_namelen = sizeof(sender), .msg_iov = &vec_buff_rec, .msg_iovlen = 1 // We have only one iovec buffer. But if we had 2, we would write 2. }; bytes = recvmsg(sock_fd, &msg_from_peer, 0); if (bytes < 0) { printf("Error - recvfrom error: %s\n", strerror(errno)); break; } if (bytes > 0) { printf("Received: %.*s\r", (int)bytes, output_buffer); // Treat incoming packets. int work_tlv_status = listen_for_packets(&output_buffer, bytes, &sender, sock_fd); if (work_tlv_status < 0) { perror(">> Error while treating the incoming packet."); } } } } else { continue; } } return 0; } int bootstrap_node(int * sock_fd){ printf(">> Boostraping node...\n"); struct sockaddr_in6 server_addr; /* Create UDP socket */ * sock_fd = socket(AF_INET6, SOCK_DGRAM, 0); if ( * sock_fd < 0) { printf("Error - failed to open socket: %s\n", strerror(errno)); return -1; } /* Bind socket */ memset(&server_addr, 0, sizeof(server_addr)); server_addr.sin6_family = AF_INET6; // server_addr.sin6_addr.in6_addr = htonl(INADDR_ANY); server_addr.sin6_port = htons(LISTEN_PORT); if (bind( * sock_fd, (struct sockaddr *)(&server_addr), sizeof(server_addr)) < 0) { printf("Error - failed to bind socket: %s\n", strerror(errno)); return -2; } printf(">> Boostraping done.\n"); return 0; } int main(int argc, const char *argv[]) { printf(">> Starting node\n"); int sock_fd; bootstrap_node(&sock_fd); run_node(sock_fd); close(sock_fd); return 0; }