«  7.7. Deadlock   ::   Contents   ::   8.1. Synchronization Patterns and Problems  »

7.8. Extended Example: Event Log File

Many concurrent systems use a common log file to store records of key events. For instance, journaling file systems use a log to keep track of changes that can be validated in case of a system crash. This Extended Example uses multiple threads that create random fake events that then get written to the common log file. Access to the log file is synchronized to ensure that the writes do not interfere with each other.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
#include <assert.h>
#include <fcntl.h>
#include <pthread.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <time.h>
#include <unistd.h>

#define SHALENGTH 64
#define LOGFILE "var.log"

/* Change these to increase the number of threads and events */
#define NUM_THREADS 5
#define NUM_ITERATIONS 10

void append_log_message (FILE *, size_t, char *);
char * compute_log_hash (FILE *, size_t);
void create_initial_log (void);
char * generate_random_log (int);
bool verify_log (FILE *);

/* Thread parameter struct */
struct log_params {
  int tid;                /* thread ID */
  pthread_mutex_t *mutex; /* shared mutex */
  size_t *log_index;      /* shared log index */
  size_t iterations;      /* number of events */
};

/* Event-generating thread. Each copy of this thread will
   sleep for a random amount of time, then wake up to generate
   an event. When that happens, the thread acquires the lock
   for the log file, verifies that the log is still valid,
   appends a new log message, and closes the file. After a
   certain number of iterations has occurred, the thread exits */
void *
event_thread (void * _params)
{
  struct log_params *params = (struct log_params *) _params;
  unsigned seed = (unsigned) params->tid;
  size_t i;
  for (i = 0; i < params->iterations; i++)
    {
      usleep ((rand_r (&seed) % 500000) + 1);
      pthread_mutex_lock (params->mutex);

      /* Here begins the critical section */
      FILE *fp = fopen (LOGFILE, "a+");
      if (fp == NULL)
        {
          pthread_mutex_unlock (params->mutex);
          pthread_exit (NULL);
        }

      if (! verify_log (fp))
        {
          perror ("Failed to validate log file");
          fclose (fp);
          pthread_mutex_unlock (params->mutex);
          pthread_exit (NULL);
        }

      /* All previous log messages have hashed correctly. Append
         a new message, which includes an updated hash value. */
      char *log_message = generate_random_log (params->tid);
      append_log_message (fp, *params->log_index, log_message);
      free (log_message);

      /* Increment shared log index number and close the file */
      (*params->log_index)++;
      fclose (fp);

      /* End of critical section */
      pthread_mutex_unlock (params->mutex);
    }
  pthread_exit (NULL);
}

/* Main entry point */
int
main (void)
{
  /* Create a bogus initial log to start with */
  create_initial_log ();
  size_t log_index = 1;

  pthread_mutex_t mutex;
  pthread_mutex_init (&mutex, NULL);

  /* Create the thread parameters and the threads */
  pthread_t threads[NUM_THREADS];
  struct log_params params[NUM_THREADS];
  size_t i;
  for (i = 0; i < NUM_THREADS; i++)
    {
      params[i].tid = i;
      params[i].mutex = &mutex;
      params[i].log_index = &log_index;
      params[i].iterations = NUM_ITERATIONS;
      pthread_create (&threads[i], NULL, event_thread, &params[i]);
    }

  /* Join the threads then destroy the mutex and exit */
  for (i = 0; i < NUM_THREADS; i++)
    pthread_join (threads[i], NULL);

  pthread_mutex_destroy (&mutex);
  pthread_exit (NULL);
}

/* Log file verification routine */
bool
verify_log (FILE *fp)
{
  /* Compute the hash value from all lines except the last bytes
     that contain the final hash */
  char *computed = compute_log_hash (fp, SHALENGTH + 2);

  /* Now read the final hash and confirm that it matches */
  char expected[SHALENGTH + 3];
  memset (expected, 0, sizeof (expected));
  fread (expected, sizeof (char), SHALENGTH + 2, fp);

  if (strncmp (computed, expected, SHALENGTH))
    return false;

  free (computed);
  return true;
}

/* Append a message to the log file */
void
append_log_message (FILE *fp, size_t index, char *message)
{
  /* Add the message at the end of the file and flush the data
     to ensure it gets written to disk */
  fseek (fp, 0, SEEK_END);
  fprintf (fp, "%zd: %s\n", index, message);
  fflush (fp);

  /* Re-compute the hash with the new message and append it to
     the end of the file */
  char *computed = compute_log_hash (fp, 0);
  fseek (fp, 0, SEEK_END);
  fprintf (fp, "%s\n\n", computed);
  free (computed);
}

/* For a given file, compute the SHA-256 hash sum of the contents.
   The trailing parameter indicates the number of bytes at the
   end of the file to omit from the computation. Returns a
   dynamically allocated string that must be freed. */
char *
compute_log_hash (FILE *fp, size_t trailing)
{
  /* Allocate pipes for parent-to-child communication and create
     a child process */
  int p2c_pipe[2], c2p_pipe[2];
  pipe (p2c_pipe);
  pipe (c2p_pipe);
  pid_t child_pid = fork ();
  assert (child_pid >= 0);

  /* Child process will read data from the pipe, redirecting this
     data as stdin to the shasum program. The stdout will be
     redirected through the pipe back to the parent. */
  if (child_pid == 0)
    {
      close (p2c_pipe[1]);
      close (c2p_pipe[0]);
      dup2 (p2c_pipe[0], STDIN_FILENO);
      dup2 (c2p_pipe[1], STDOUT_FILENO);
      execlp ("shasum", "shasum", "-a", "256", "-", NULL);
    }

  /* Parent resumes from here by closing the unused pipe ends */
  close (p2c_pipe[0]);
  close (c2p_pipe[1]);

  /* Determine the size of the file without the last two lines
     (containing a shasum and a blank line) */
  struct stat file_info;
  assert (stat (LOGFILE, &file_info) >= 0);
  size_t remaining = file_info.st_size - trailing;
  fseek (fp, 0, SEEK_SET);

  /* Instead of reading the entire file contents (which could be
     large), read a chunk at a time and write the contents into
     the pipe for the child to access */
  size_t size = 1024;
  char buffer[size + 1];
  while (remaining > size)
    {
      memset (buffer, 0, sizeof (buffer));
      fread (buffer, sizeof (char), size, fp);
      write (p2c_pipe[1], buffer, size);
      remaining -= size;
    }

  /* If there is any left over, read those bytes and send them to
     the child */
  if (remaining > 0)
    {
      memset (buffer, 0, sizeof (buffer));
      fread (buffer, sizeof (char), remaining, fp);
      write (p2c_pipe[1], buffer, remaining);
    }

  /* All data have been written, so close the pipe so the child
     can finalize its calculation */
  close (p2c_pipe[1]);

  /* Read the computed shasum back and compare it with the last
     shasum in the file */
  char shabuf[SHALENGTH + 1];
  memset (shabuf, 0, sizeof (shabuf));
  read (c2p_pipe[0], shabuf, SHALENGTH);
  close (c2p_pipe[0]);

  return strdup (shabuf);
}

/* asctime_r returns 24 characters plus \n and \0 */
#define ASCTIME_BUFSIZE 26

char *
generate_random_log (int tid)
{
  /* Determine the current local time and convert it to an
     ASCII-printable format */
  char timebuf[ASCTIME_BUFSIZE];
  memset (timebuf, 0, sizeof (timebuf));
  struct tm localbuf;
  memset (&localbuf, 0, sizeof (localbuf));

  time_t current = time (NULL);
  struct tm *local = localtime_r (&current, &localbuf);
  char *timemsg = asctime_r (local, timebuf);

  char *message = calloc (100, sizeof (char));
  assert (message != NULL);

  /* Use the ASCII-formatted time to create a log message like:
     [Wed Jun 26 10:32:15 1996] Event from thread 5
   */
  message[0] = '[';
  strncpy (&message[1], timemsg, ASCTIME_BUFSIZE - 2);
  strncat (message, "] Event from thread ", 100 - ASCTIME_BUFSIZE);
  snprintf (&message[ASCTIME_BUFSIZE + 19], 100 - strlen (message), "%d", tid);

  /* Return a heap-allocated copy of the string */
  return strdup (message);
}

/* Create bogus initial commit message to have a starting point */
void
create_initial_log (void)
{
  unlink (LOGFILE);
  struct stat file_info;
  stat (LOGFILE, &file_info);

  int file = open (LOGFILE, O_RDWR | O_CREAT);
  assert (file >= 0);

  fchmod (file, 0644);
  write (file, "0: [Tue Jun 25 22:09:41 2019] Initial commit\n", 45);
  write (file, "6231aaee038b4ab9b3fd50e5bf604f26"
               "97af98b5c126f4f86f5aaca809439c65\n\n", SHALENGTH + 2);
  close (file);
}
«  7.7. Deadlock   ::   Contents   ::   8.1. Synchronization Patterns and Problems  »

Contact Us License