Hi Don and Peter,
Thanks for your responses.
I have modified my code to reflect 1 and 2.
The biggest issue was as mentioned in point 3. Rather than comparing (==) the
code was equating (=) that made the code work well.
I executed the code with the above changes and able to read the static html
files from webserver client code using command - GET /index.html HTTP/1.1
I have a further doubt about as following -
A) With above changes, the code did not SEGFAULT and run well. The code to
process work is as below -
I would like to know any improvements which can be done on code below specially
'accept_requests' and 'process_request' method. This code is currently working fine.
int main (int argc, char *argv[]) {
//////////
Code to create socket, bind and listen for incoming connections
//////////
/* Keep waiting for connections */
while(1) {
/* Accepting connections from the client */
clilen = sizeof(cli_addr);
newsockfd = accept (sockfd, (struct sockaddr *) &cli_addr,
&clilen);
if (newsockfd < 0) {
error ("ERROR on accept");
}
/* Add work to the queue for threads to work upon */
threadpool_add_work((void *) threadp, (*accept_requests), (void
*) newsockfd);
}
//////////
}
void *accept_requests(void *params) {
int socket_fd = (int) params;
pthread_t tid = pthread_self();
/* Take the data from socket and pass it for processing */
process_requests(socket_fd);
printf("Done processing: %d\n", tid);
pthread_exit(NULL);
}
void process_requests(int socket_fd) {
int n, ret;
char buffer[256];
char line[100], method[100], path[100], protocol[100];
char *file;
FILE *fp;
size_t bytes_read;
/* Initializing the buffer */
bzero(buffer, 256);
/* Reading from socket */
n = read (socket_fd, buffer, 255);
if (n < 0) {
error("ERROR reading from socket");
}
printf("Client communication message:: %s\n", buffer);
ret = sscanf(buffer, "%[^ ] %[^ ] %[^ ]", method, path, protocol);
if (ret != 3) {
printf(" Bad Request : Can't parse request ");
}
file = &(path[1]);
fp = fopen(file, "r");
if (fp == NULL) {
error (" Could not open the file ");
} else {
bzero(buffer, 256);
bytes_read = fread(buffer, sizeof(buffer), 1, fp);
}
write(socket_fd, buffer, 256);
close(socket_fd);
}
B) I have typedefs in header file -
1) threadpool_t which is a pointer variable - pointer to threadpool structure.
>> typedef struct threadpool {
>> /* Pool Characteristics */
>> int num_threads;
>> int max_queue_size;
>>
>> int do_not_block_when_full;
>> pthread_t *threads;
>> int cur_queue_size;
>> threadpool_work_t *queue_head;
>> threadpool_work_t *queue_tail;
>>
>> pthread_mutex_t queue_lock;
>> pthread_cond_t queue_not_empty;
>> pthread_cond_t queue_not_full;
>> pthread_cond_t queue_empty;
>>
>> int queue_closed;
>> int shutdown;
>> } *threadpool_t;
2) I declare in my (current modified code) main method -
threadpool_t threadp;
which by header definition above => threadp is of type threadpool_t (typedef),
which is a pointer to struct threadpool.
This 'threadp' variable is malloced before threadpool_init is called as -
if ((threadp = malloc(sizeof(threadp))) == NULL) {
perror("malloc");
exit(1);
}
Hence to threadpool_init function that has following signature -
void threadpool_init(threadpool_t *threadp,
int num_worker_threads,
int max_queue_size,
int do_not_block_when_full);
I pass arg1 as '&threadp' - i.e. I am passing the address of memory location of
'threadp' - i.e. address of memory location of struct-type 'threadpool'
But, inside 'threadpool_init' method there is local 'threadP' variable of type
'threadpool_t' as -
threadpool_t threadP;
Here again, I malloced the threadP of structure type 'threadpool_t' as -
if ((threadP = malloc(sizeof(*threadP))) == NULL) {
perror("malloc");
exit(1);
}
It can be seen that malloc() assignment for 'threadP' has <b>'*threadP'</b> as
parameter for sizeof() method. However compare this with malloc assignment of
'threadp' and it can seen that <b>'threadp'</b> is passed as parameter for
sizeof() method. So essentially once I am passing pointer variable and in
another case just the variable of type 'threadpool_t'.
I tried passing just the variable of type 'threadpool_t' i.e. 'threadP' in
sizeof() function of malloc code above but that causes program to hang indefinitely.
I am not sure, as why is this difference of behaviour in mallocing ? Why can not
the 2nd mallocing work as following -
if ((threadP = malloc(sizeof(threadP))) == NULL) {
perror("malloc");
exit(1);
}
It should be noticed that that finally in 'threadpool_init' function 'threadP'
is assigned to '*threadp' as -
====>>>> *threadp = threadP; // In the last line of 'threadpool_init' function
Appreciate your on this so far.
Thanks
Mahendra
Don Morris wrote:
> Mahendra Kumar Kutare wrote:
>>
>> Thanks for your response. Sorry for multiple postings.
>>
>> Now, coming back to the problem -
>>
>
> 3 issues stand out for me:
>
> 1) You don't check the return value of malloc() when allocating
> workp in threadpool_add_work(). Granted, you don't immediately die
> touching it... but you should check.
>
> 2) As previously noted, make sure you've included stdlib.h and get
> rid of the malloc() casting.
>
> 3) I think your problem is this line in threadpool_thread():
> if ((!threadpool->do_not_block_when_full) &&
> (threadpool->cur_queue_size =
> (threadpool->max_queue_size -1))) {
>
> If the threadpool is not marked as "do_not_block_when_full", you're
> always setting the size of the circular queue to be the max - 1
> [because I think this is a typo where you have "=" instead of "=="].
> As a result, the queue will erroneously report it has elements when
> you've in fact removed them all... hence you derefence the NULL pointer
> at the tail of the queue and die.
>
> Since your main() does mark the threadpool via threadpool_init with
> 0 for the do_not_block_when_full state... this seems consistent to me.
>
> Don
>
>> Code for the main method is as below -
>>
>> ************************************************** ************************
>>
>>
>> int main (int argc, char *argv[]) {
>>
>> threadpool_t threadpool;
>> // Code to create INET socket
>> // bind, accept and listening socket code
>>
>> /* Allocate a pool data structure */
>> if ((threadpool = (threadpool_t) malloc(sizeof(struct
>> threadpool))) == NULL) {
>> perror("malloc");
>> exit(1);
>> }
>>
>> /* Initialize the thread pool */
>> threadpool_init(&threadpool, NUM_WORKER_THREADS, MAX_QUEUE_SIZE,
>> 0);
>>
>> while(1) {
>> /* Accepting connections from the client */
>> clilen = sizeof(cli_addr);
>> newsockfd = accept (sockfd, (struct sockaddr *)
>> &cli_addr, &clilen);
>> if (newsockfd < 0) {
>> error ("ERROR on accept");
>> }
>>
>> threadpool_add_work((void *) threadpool,
>> (*accept_requests), (void *) newsockfd);
>> }
>>
>> return 0;
>> }
>> ************************************************** ************************
>>
>>
>> void threadpool_init(threadpool_t *threadpoolp,
>> int num_worker_threads,
>> int max_queue_size,
>> int do_not_block_when_full) {
>>
>> int i, rtn;
>> threadpool_t threadpool;
>>
>> /* Allocate a pool data structure */
>> if ((threadpool = (threadpool_t) malloc(sizeof(struct
>> threadpool))) == NULL) {
>> perror("malloc");
>> exit(1);
>> }
>>
>> /* Initialize the fields */
>> threadpool->num_threads = num_worker_threads;
>> threadpool->max_queue_size = max_queue_size;
>> threadpool->do_not_block_when_full = do_not_block_when_full;
>>
>> if ((threadpool->threads = (pthread_t *)
>> malloc(sizeof(pthread_t) *num_worker_threads)) == NULL) {
>> perror("malloc");
>> exit(1);
>> }
>>
>> threadpool->cur_queue_size = 0;
>> threadpool->queue_head = NULL;
>> threadpool->queue_tail = NULL;
>>
>> threadpool->queue_closed = 0;
>> threadpool->shutdown = 0;
>>
>> if ((rtn = pthread_mutex_init(&(threadpool->queue_lock), NULL))
>> != 0) {
>> fprintf (stderr, "pthread_mutex_init %s", strerror(rtn));
>> exit(1);
>> }
>>
>> if ((rtn = pthread_cond_init(&(threadpool->queue_not_empty),
>> NULL)) != 0) {
>> fprintf (stderr, "pthread_cond_init %s", strerror(rtn));
>> exit(1);
>> }
>>
>> if ((rtn = pthread_cond_init(&(threadpool->queue_not_full),
>> NULL)) != 0) {
>> fprintf (stderr, "pthread_cond_init %s", strerror(rtn));
>> exit(1);
>> }
>>
>> if ((rtn = pthread_cond_init(&(threadpool->queue_empty), NULL))
>> != 0) {
>> fprintf (stderr, "pthread_cond_init %s", strerror(rtn));
>> exit(1);
>> }
>>
>> /* Create threads */
>> for (i = 0; i != num_worker_threads; i++) {
>> if ((rtn = pthread_create(&(threadpool->threads[i]),
>> NULL, (void *) (*threadpool_thread),
>> (void *) threadpool)) != 0) {
>> fprintf (stderr, "pthread_create %d", rtn);
>> exit(1);
>> }
>> }
>>
>> *threadpoolp = threadpool;
>>
>> ************************************************** ************************
>>
>>
>> void threadpool_thread(threadpool_t threadpool) {
>>
>> threadpool_work_t *my_workp;
>>
>> for(;
{
>> pthread_mutex_lock(&(threadpool->queue_lock));
>>
>> printf("Cur queue size:: %d\n",
>> threadpool->cur_queue_size);
>> printf("Max queue size:: %d\n",
>> threadpool->max_queue_size);
>> printf("Cur shutdown :: %d\n",
>> (!(threadpool->shutdown)));
>> while((threadpool->cur_queue_size == 0) &&
>> (!threadpool->shutdown)) {
>> printf("Waiting for pthread_cond_wait
>> queue_not_empty:: %d\n", threadpool->queue_not_empty);
>>
>> pthread_cond_wait(&(threadpool->queue_not_empty),
>> &(threadpool->queue_lock));
>> }
>>
>> if (threadpool->shutdown) {
>> pthread_mutex_unlock(&(threadpool->queue_lock));
>> pthread_exit(NULL);
>> }
>>
>> my_workp = threadpool->queue_head;
>> threadpool->cur_queue_size--;
>> if (threadpool->cur_queue_size == 0) {
>> threadpool->queue_head = threadpool->queue_tail = NULL;
>> } else {
>> threadpool->queue_head = my_workp->next;
>> }
>>
>> if ((!threadpool->do_not_block_when_full) &&
>> (threadpool->cur_queue_size =
>> (threadpool->max_queue_size -1))) {
>>
>> pthread_cond_signal(&(threadpool->queue_not_full));
>> }
>>
>> if (threadpool->cur_queue_size == 0) {
>> pthread_cond_signal(&(threadpool->queue_empty));
>> }
>>
>> pthread_mutex_unlock(&(threadpool->queue_lock));
>> (*(my_workp->routine))(my_workp->arg);
>> free(my_workp);
>> }
>> }
>>
>> ************************************************** ************************
>>
>> int threadpool_add_work(threadpool_t threadpool, void *routine, void
>> *arg) {
>>
>> threadpool_work_t *workp;
>> pthread_mutex_lock(&threadpool->queue_lock);
>>
>> if((threadpool->cur_queue_size == threadpool->max_queue_size) &&
>> threadpool->do_not_block_when_full) {
>> pthread_mutex_unlock(&threadpool->queue_lock);
>> return -1;
>> }
>>
>> while((threadpool->cur_queue_size ==
>> threadpool->max_queue_size) &&
>> ((threadpool->shutdown || threadpool->queue_closed))) {
>>
>> printf("Waiting for pthread_cond_wait queue_not_full::
>> %d\n", threadpool->queue_not_full);
>> pthread_cond_wait(&threadpool->queue_not_full,
>> &threadpool->queue_lock);
>> }
>>
>> if(threadpool->shutdown || threadpool->queue_closed) {
>> printf("Threadpool_add_work::Unlocking mutex:: %d\n",
>> threadpool->queue_lock);
>> pthread_mutex_unlock(&threadpool->queue_lock);
>> return -1;
>> }
>>
>>> /* Allocate work structure */
>>> workp = (threadpool_work_t *) malloc(sizeof(threadpool_work_t));
>>> workp->routine = routine;
>>> workp->arg = arg;
>>> workp->next = NULL;
>>>
>>> if (threadpool->cur_queue_size == 0) {
>>> threadpool->queue_tail = threadpool->queue_head =workp;
>>> printf("Signal for pthread_cond_wait waiting on
>>> queue_not_empty:: %d\n", threadpool->queue_not_empty);
>>> pthread_cond_signal(&threadpool->queue_not_empty);
>>> } else {
>>> (threadpool->queue_tail)-> next = workp;
>>> threadpool->queue_tail = workp;
>>> }
>>
>> threadpool->cur_queue_size++;
>> pthread_mutex_unlock(&threadpool->queue_lock);
>> return 1;
>> }
>> ************************************************** ************************
>>
>>
>> The header file threadpool.h looks as below -
>>
>> ************************************************** ************************
>>
>> typedef struct threadpool_work {
>> void (*routine) ();
>> void *arg;
>> struct threadpool_work *next;
>> } threadpool_work_t;
>>
>> typedef struct threadpool {
>> /* Pool Characteristics */
>> int num_threads;
>> int max_queue_size;
>>
>> int do_not_block_when_full;
>> pthread_t *threads;
>> int cur_queue_size;
>> threadpool_work_t *queue_head;
>> threadpool_work_t *queue_tail;
>>
>> pthread_mutex_t queue_lock;
>> pthread_cond_t queue_not_empty;
>> pthread_cond_t queue_not_full;
>> pthread_cond_t queue_empty;
>>
>> int queue_closed;
>> int shutdown;
>> } *threadpool_t;
>>
>> void threadpool_init(threadpool_t *threadpoolp,
>> int num_worker_threads,
>> int max_queue_size,
>> int do_not_block_when_full);
>>
>> int threadpool_add_work(threadpool_t threadpool,
>> void *routine,
>> void *arg);
>>
>> int threadpool_destroy(threadpool_t threadpoolp, int finish);
>>
>> void threadpool_thread(threadpool_t threadpool);
>> ************************************************** ************************
>>
>>
>> Now as can be seen "workp" is allocated structure in the above code
>> and initialized the 'routine' to routine, 'arg' to arg and 'next' a
>> pointer to NULL respectively.
>>
>> > I am not sure what you mean by "which is itself", but in general,
>> > allocating alone leaves the value uninitialized. If the value happens
>> > to be a struct, then its members are unitialized. Accessing an
>> > uninitialized member may just yield garbage if it happens to be a
>> > number, but if it is a pointer, the consequences may be more dire,
>> > as you have experienced.
>>
>> Now i see your point that 'next' which itself is a pointer to struct
>> and its members are not initialized. But threadpool->queue_tail which
>> has struct element 'next' is NOT initialized to which I am trying to
>> allocate 'workp'.
>>
>> This threadpool is passed in the declared in the beginning of the main
>> method and then allocated before calling thread_init(....).
>>
>> So how do I initialize
>>
>> (threadpool->queue_tail) which points to a structure
>> threadpool_work_t and whose element - 'next' is pointer to
>> threadpool_work_t ?
>>
>>
>>
>> Thanks
>> Mahendra
>>
>>
>> Peter Pichler wrote:
>>> [You really did not need to post your question 3 times. Your posts
>>> may not appear immediately, that's just the way it is. You may need
>>> to wait a bit.]
>>>
>>> Mahendra Kumar Kutare wrote:
>>>
>>>> I am trying to implement a webserver with boss-worker model thread
>>>> pool implementation -
>>>
>>> My first reaction was, "oh no, another off-topic question," but in
>>> fact yours is perfectly topical.
>>>
>>> <snip>
>>>
>>>> /* Allocate a pool data structure */
>>>> if ((threadpool = (threadpool_t) malloc(sizeof(struct threadpool))) ==
>>>> NULL) {
>>>
>>> In C, casting malloc is not required and may hide a problem.
>>> the preferred way is 'type *p = malloc(*p);'. That way, even if type
>>> changes, the statement will remain safe. In your case, you could do
>>>
>>> if ((threadpool = malloc(sizeof(*threadpool))) == /*...*/
>>>
>>>> perror("malloc");
>>>> exit(1);
>>>> }
>>>>
>>>> threadpool_add_work function has following code -
>>>>
>>>> if (threadpool->cur_queue_size == 0) {
>>>> threadpool->queue_tail = threadpool->queue_head =workp;
>>>
>>> What's workp? Is it initialized to a valid threadpool?
>>>
>>>> printf("Signal for pthread_cond_wait waiting on
>>>> queue_not_empty:: %d\n", threadpool->queue_not_empty);
>>>> pthread_cond_signal(&threadpool->queue_not_empty);
>>>> } else {
>>>> (threadpool->queue_tail)-> next = workp;
>>> ^^^^^^^
>>> Because if it isn't - BANG!
>>>
>>>> threadpool->queue_tail = workp;
>>>> }
>>>>
>>>> For the first HTTP request, it will find threadpool->cur_queue_size ==
>>>> 0 and hence will execute the IF block of the above code, but for any
>>>> subsequent request it will execute ELSE block above.
>>>>
>>>> When I am sending 2nd request, ITS SEGFAULTING at
>>>> (threadpool->queue_tail)-> next = workp;
>>>>
>>>> I debugged it through gdb and when trying to print following -
>>>>
>>>> (threadpool->queue_tail)-> next
>>>>
>>>> it throws - Can not access memory
>>>
>>> If it does, then the implementors obviously could not spell.
>>> It should be "cannot" (one word).
>>>
>>>> Which i believe is because - Memory is not allocated to the structure
>>>> threadpool self element.
>>>
>>> Not so much not allocated as not initialized, I believe.
>>>
>>>> My Question is - When i allocate memory in the begining to threadpool
>>>> structure do i have to still allocate memory for any element which is
>>>> itself in this case. ? If so, how do i do that ?
>>>
>>> I am not sure what you mean by "which is itself", but in general,
>>> allocating alone leaves the value uninitialized. If the value happens
>>> to be a struct, then its members are unitialized. Accessing an
>>> uninitialized member may just yield garbage if it happens to be a
>>> number, but if it is a pointer, the consequences may be more dire,
>>> as you have experienced.
>>>
>>>> Or is there something else which I am missing here ?
>>>
>>> Probably initializing your workp. Since I do not know where it came
>>> from, I cannot be sure.
>>>
>>> Peter