Canonical Voices

Posts tagged with 'readdirectorychangesw'

mandel

I have had a nice riddle given to me by Tim-Erwin within the comments of Bitten in the ass by ReadDirectoryChangesW and multithreading which I really appreciate. His problem was with the following code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import win32file
import win32con
import win32event
import pywintypes
 
FILE_LIST_DIRECTORY = 0x0001
folder = r"C:\some folder to watch"
 
handle = win32file.CreateFile (folder, FILE_LIST_DIRECTORY,
              win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
              None, win32con.OPEN_EXISTING,
              win32con.FILE_FLAG_BACKUP_SEMANTICS, None)
buffer = win32file.AllocateReadBuffer(1024)
overlapped = pywintypes.OVERLAPPED()
overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None)
flags = win32con.FILE_NOTIFY_CHANGE_FILE_NAME |\
                  win32con.FILE_NOTIFY_CHANGE_DIR_NAME |\
                  win32con.FILE_NOTIFY_CHANGE_ATTRIBUTES |\
                  win32con.FILE_NOTIFY_CHANGE_SIZE |\
                  win32con.FILE_NOTIFY_CHANGE_LAST_WRITE |\
                  win32con.FILE_NOTIFY_CHANGE_SECURITY
 
win32file.ReadDirectoryChangesW(handle, buffer, False, flags, overlapped)
print "never reached until a change is done in the folder"
win32event.WaitForSingleObject(overlapped.hEvent, 5000)

As Tim-Erwin said in his comments, the above code was not being blocked in the WaitForSingleObject call but in the ReadDirecotryChangesW one. This is clearly my fault because in the blog post I did not give a very important detail that is needed to get the code working async. In order for the above to work correctly it is imperative that you pass to the CreateFile function the FILE_FLAG_OVERLAPPED flag. That means, that in order to fix the code and be blocked in WaitForSingleObject we have to modify the CreateFile call in the following way:

9
10
11
12
handle = win32file.CreateFile (folder, FILE_LIST_DIRECTORY,
              win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
              None, win32con.OPEN_EXISTING,
              win32con.FILE_FLAG_BACKUP_SEMANTICS | win32file.FILE_FLAG_OVERLAPPED , None)

That fixes Tim-Erwin problem and should help any other person with the same issue. Sorry for not being explicit on such an important detail Tim-Erwin, please accept my apologies :) .

Read more
mandel

Last week was probably one of the best coding sprints I have had since I started working in Canonical, I’m serious!. I had the luck to pair program with alecu on the FilesystemMonitor that we use in Ubuntu One on windows. The implementation has improved so much that I wanted to blog about it and show it as an example of how to hook the ReadDirectoryChangesW call from COM into twisted so that you can process the events using twisted which is bloody cool.

We have reduce the implementation of the Watch and WatchManager to match our needs and reduce the API provided since we do not use all the API provided by pyinotify. The Watcher implementation is as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
class Watch(object):
    """Implement the same functions as pyinotify.Watch."""
 
    def __init__(self, watch_descriptor, path, mask, auto_add, processor,
        buf_size=8192):
        super(Watch, self).__init__()
        self.log = logging.getLogger('ubuntuone.SyncDaemon.platform.windows.' +
            'filesystem_notifications.Watch')
        self.log.setLevel(TRACE)
        self._processor = processor
        self._buf_size = buf_size
        self._wait_stop = CreateEvent(None, 0, 0, None)
        self._overlapped = OVERLAPPED()
        self._overlapped.hEvent = CreateEvent(None, 0, 0, None)
        self._watching = False
        self._descriptor = watch_descriptor
        self._auto_add = auto_add
        self._ignore_paths = []
        self._cookie = None
        self._source_pathname = None
        self._process_thread = None
        # remember the subdirs we have so that when we have a delete we can
        # check if it was a remove
        self._subdirs = []
        # ensure that we work with an abspath and that we can deal with
        # long paths over 260 chars.
        if not path.endswith(os.path.sep):
            path += os.path.sep
        self._path = os.path.abspath(path)
        self._mask = mask
        # this deferred is fired when the watch has started monitoring
        # a directory from a thread
        self._watch_started_deferred = defer.Deferred()
 
    @is_valid_windows_path(path_indexes=[1])
    def _path_is_dir(self, path):
        """Check if the path is a dir and update the local subdir list."""
        self.log.debug('Testing if path %r is a dir', path)
        is_dir = False
        if os.path.exists(path):
            is_dir = os.path.isdir(path)
        else:
            self.log.debug('Path "%s" was deleted subdirs are %s.',
                path, self._subdirs)
            # we removed the path, we look in the internal list
            if path in self._subdirs:
                is_dir = True
                self._subdirs.remove(path)
        if is_dir:
            self.log.debug('Adding %s to subdirs %s', path, self._subdirs)
            self._subdirs.append(path)
        return is_dir
 
    def _process_events(self, events):
        """Process the events form the queue."""
        # do not do it if we stop watching and the events are empty
        if not self._watching:
            return
 
        # we transform the events to be the same as the one in pyinotify
        # and then use the proc_fun
        for action, file_name in events:
            if any([file_name.startswith(path)
                        for path in self._ignore_paths]):
                continue
            # map the windows events to the pyinotify ones, tis is dirty but
            # makes the multiplatform better, linux was first :P
            syncdaemon_path = get_syncdaemon_valid_path(
                                        os.path.join(self._path, file_name))
            is_dir = self._path_is_dir(os.path.join(self._path, file_name))
            if is_dir:
                self._subdirs.append(file_name)
            mask = WINDOWS_ACTIONS[action]
            head, tail = os.path.split(file_name)
            if is_dir:
                mask |= IN_ISDIR
            event_raw_data = {
                'wd': self._descriptor,
                'dir': is_dir,
                'mask': mask,
                'name': tail,
                'path': '.'}
            # by the way in which the win api fires the events we know for
            # sure that no move events will be added in the wrong order, this
            # is kind of hacky, I dont like it too much
            if WINDOWS_ACTIONS[action] == IN_MOVED_FROM:
                self._cookie = str(uuid4())
                self._source_pathname = tail
                event_raw_data['cookie'] = self._cookie
            if WINDOWS_ACTIONS[action] == IN_MOVED_TO:
                event_raw_data['src_pathname'] = self._source_pathname
                event_raw_data['cookie'] = self._cookie
            event = Event(event_raw_data)
            # FIXME: event deduces the pathname wrong and we need to manually
            # set it
            event.pathname = syncdaemon_path
            # add the event only if we do not have an exclude filter or
            # the exclude filter returns False, that is, the event will not
            # be excluded
            self.log.debug('Event is %s.', event)
            self._processor(event)
 
    def _call_deferred(self, f, *args):
        """Executes the defeered call avoiding possible race conditions."""
        if not self._watch_started_deferred.called:
            f(args)
 
    def _watch(self):
        """Watch a path that is a directory."""
        # we are going to be using the ReadDirectoryChangesW whihc requires
        # a directory handle and the mask to be used.
        handle = CreateFile(
            self._path,
            FILE_LIST_DIRECTORY,
            FILE_SHARE_READ | FILE_SHARE_WRITE,
            None,
            OPEN_EXISTING,
            FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED,
            None)
        self.log.debug('Watching path %s.', self._path)
        while True:
            # important information to know about the parameters:
            # param 1: the handle to the dir
            # param 2: the size to be used in the kernel to store events
            # that might be lost while the call is being performed. This
            # is complicated to fine tune since if you make lots of watcher
            # you migh used too much memory and make your OS to BSOD
            buf = AllocateReadBuffer(self._buf_size)
            try:
                ReadDirectoryChangesW(
                    handle,
                    buf,
                    self._auto_add,
                    self._mask,
                    self._overlapped,
                )
                reactor.callFromThread(self._call_deferred,
                    self._watch_started_deferred.callback, True)
            except error:
                # the handle is invalid, this may occur if we decided to
                # stop watching before we go in the loop, lets get out of it
                reactor.callFromThread(self._call_deferred,
                    self._watch_started_deferred.errback, error)
                break
            # wait for an event and ensure that we either stop or read the
            # data
            rc = WaitForMultipleObjects((self._wait_stop,
                                         self._overlapped.hEvent),
                                         0, INFINITE)
            if rc == WAIT_OBJECT_0:
                # Stop event
                break
            # if we continue, it means that we got some data, lets read it
            data = GetOverlappedResult(handle, self._overlapped, True)
            # lets ead the data and store it in the results
            events = FILE_NOTIFY_INFORMATION(buf, data)
            self.log.debug('Events from ReadDirectoryChangesW are %s', events)
            reactor.callFromThread(self._process_events, events)
 
        CloseHandle(handle)
 
    @is_valid_windows_path(path_indexes=[1])
    def ignore_path(self, path):
        """Add the path of the events to ignore."""
        if not path.endswith(os.path.sep):
            path += os.path.sep
        if path.startswith(self._path):
            path = path[len(self._path):]
            self._ignore_paths.append(path)
 
    @is_valid_windows_path(path_indexes=[1])
    def remove_ignored_path(self, path):
        """Reaccept path."""
        if not path.endswith(os.path.sep):
            path += os.path.sep
        if path.startswith(self._path):
            path = path[len(self._path):]
            if path in self._ignore_paths:
                self._ignore_paths.remove(path)
 
    def start_watching(self):
        """Tell the watch to start processing events."""
        for current_child in os.listdir(self._path):
            full_child_path = os.path.join(self._path, current_child)
            if os.path.isdir(full_child_path):
                self._subdirs.append(full_child_path)
        # start to diff threads, one to watch the path, the other to
        # process the events.
        self.log.debug('Start watching path.')
        self._watching = True
        reactor.callInThread(self._watch)
        return self._watch_started_deferred
 
    def stop_watching(self):
        """Tell the watch to stop processing events."""
        self.log.info('Stop watching %s', self._path)
        SetEvent(self._wait_stop)
        self._watching = False
        self._subdirs = []
 
    def update(self, mask, auto_add=False):
        """Update the info used by the watcher."""
        self.log.debug('update(%s, %s)', mask, auto_add)
        self._mask = mask
        self._auto_add = auto_add
 
    @property
    def path(self):
        """Return the patch watched."""
        return self._path
 
    @property
    def auto_add(self):
        return self._auto_add

The important details of this implementations are the following:

Use a deferred to notify that the watch started.

During or tests we noticed that the start watch function was slow which would mean that from the point when we start watching the directory and the point when the thread actually started we would be loosing events. The function now returns a deferred that will be fired when the ReadDirectoryChangesW has been called which ensures that no events will be lost. The interesting parts are the following:

define the deferred

31
32
33
       # this deferred is fired when the watch has started monitoring
        # a directory from a thread
        self._watch_started_deferred = defer.Deferred()

Call the deferred either when we successfully started watching:

128
129
130
131
132
133
134
135
136
137
138
            buf = AllocateReadBuffer(self._buf_size)
            try:
                ReadDirectoryChangesW(
                    handle,
                    buf,
                    self._auto_add,
                    self._mask,
                    self._overlapped,
                )
                reactor.callFromThread(self._call_deferred,
                    self._watch_started_deferred.callback, True)

Call it when we do have an error:

139
140
141
142
143
144
            except error:
                # the handle is invalid, this may occur if we decided to
                # stop watching before we go in the loop, lets get out of it
                reactor.callFromThread(self._call_deferred,
                    self._watch_started_deferred.errback, error)
                break

Threading and firing the reactor.

There is an interesting detail to take care of in this code. We have to ensure that the deferred is not called more than once, to do that you have to callFromThread a function that will fire the event only when it was not already fired like this:

103
104
105
106
    def _call_deferred(self, f, *args):
        """Executes the defeered call avoiding possible race conditions."""
        if not self._watch_started_deferred.called:
            f(args)

If you do not do the above, but the code bellow you will have a race condition in which the deferred is called more than once.

            buf = AllocateReadBuffer(self._buf_size)
            try:
                ReadDirectoryChangesW(
                    handle,
                    buf,
                    self._auto_add,
                    self._mask,
                    self._overlapped,
                )
                if not self._watch_started_deferred.called:
                    reactor.callFromThread(self._watch_started_deferred.callback, True)
            except error:
                # the handle is invalid, this may occur if we decided to
                # stop watching before we go in the loop, lets get out of it
                if not self._watch_started_deferred.called:
                    reactor.callFromThread(self._watch_started_deferred.errback, error)
                break

Execute the processing of events in the reactor main thread.

Alecu has bloody great ideas way too often, and this is one of his. The processing of the events is queued to be executed in the twisted reactor main thread which reduces the amount of threads we use and will ensure that the events are processed in the correct order.

153
154
155
156
157
158
            # if we continue, it means that we got some data, lets read it
            data = GetOverlappedResult(handle, self._overlapped, True)
            # lets ead the data and store it in the results
            events = FILE_NOTIFY_INFORMATION(buf, data)
            self.log.debug('Events from ReadDirectoryChangesW are %s', events)
            reactor.callFromThread(self._process_events, events)

Just for this the flight to Buenos Aires was well worth it!!! For anyone to see the full code feel free to look at ubuntuone.platform.windows from ubuntuone.

Read more
mandel

At the moment some of the tests (and I cannot point out which ones) of ubuntuone-client fail when they are ran on Windows. The reason for this is due to the way in which we get the notifications out of the file system and the way the tests are written. Before I blame the OS or the tests, let me explain a number of facts about the Windows filesystem and the possible ways to interact with it.

To be able to get file system changes from the OS the Win32 API provides the following:

SHChangedNotifyRegister

This function was broken up to Vista when it was fixed, Unfortunately AFAIK we also support Windows XP which means that we cannot trust this function. On top of that taking this path means that we can have a performance issue. Because the function is build on top of Windows messages, if too many changes occur the sync daemon would start receiving roll up messages that just state that something changed and it would be up to the sync daemon to decide what really happened. Therefore we can all agree that this is a no no, right?

FindFirstChangeNotification

This is a really easy function to use which is based on ReadDirectoryChangesW (I think is a simple wrapper around it) that lets you know that something changed but gives no information about what changed. Because if is based on ReadDirectoryChangesW it suffers from the same issues.

ReadDirectoryChangesW

This is by far the most common way to get the notification changes from the system. Now, in theory there are two possible cases which can go wrong that would affect the events raised by this function:

  1. There are too many events and the buffer gets overloaded and we start loosing events. A simple way to solve this issues is to process the events in a diff thread asap so that we can keep reading the changes.
  2. We use the sync version of the function which means that we could have the following issues:
    • Blue screen of death because we used too much memory from the kernel space.
    • We cannot close the handles used to watch the changes in the directories. This makes the threads to end up blocked.

As I mentioned this is the theory and therefore makes perfect sense to choose this option as the way to get notified by the changes until… you hit a great little feature of Windows called write-behind caching. The idea of write-behind caching is the following one:

When you attempt to write a new file on your HD Windows does not directly modify the HD. Instead it makes a not of the fact that your intention is to write on disk and saves your changes in memory. Ins’t that smart?

Well, that lovely feature does come set as default AFAIK from XP onwards. Any smart person would wonder how does that interact with FindFirstChangeNotification/ReadDirectoryChangesW, well after some work here is what I have managed to find out:

The IO Manager (internal to the kernel) is queueing up disk-write requests in an internal buffer, and the actual changes are not physically committed until some condition is met which I believe is for the “write-behind caching” feature. The problem appears to be that the user-space callback via FileSystemWatcher/ReadDirectoryChanges does not occur when disk-write requests are inserted into the queue, but rather occurs when they are leaving the queue and being physically committed to disk. For what I have been able to manage through observation, the life time of a queue is based on:

  1. Whether more writes are being inserted in the q.
  2. Is another app request a read from an item in the q.

This means that when using FileSystemWatcher/ReadDirectoryChanges the events are fired only when the changes are actually committed and as for a user-space program this follows a non-deterministic process (insert spanish swearing here). a way to work around this issue is to use the FluxhFileBuffers function on the volume, which does need admin rights, yeah!

Change Journal records

Well, this allows to track the changes that have been committed in an NTFS system (that means that we do not have support to FAT). This technique allows to keep track of the changes using an update sequence number that keeps track of changes in an interesting manner. At first look, although parsing the data is hard, this solution seems to be very similar to the one used by pyinotify and therefore someone will say, hey, let just ell twisted to do a select on that file and read the changes. Well, no, is not that easy, files do not provide the functionality used for select, just sockets (http://msdn.microsoft.com/en-us/library/aa363803%28VS.85%29.aspx) /me jumps of happiness

File system filterr

Well, this is an easy one to summarize, you have to write a driver like piece of code. Means C, COM and being able to crash the entire system with a nice blue screen (although I can change the color to aubergine before we crash)

Conclusion

At this point I hope I have convinced a few to believe that ReadDirectoryChangesW is the best option to take but might be wondering why I mentioned the write-behind caching feature, well here comes my complain towards the tests. We do use the real file system notifications for testing and the trial test cases do have a timeout! Those two facts plus the lovely write-behind caching feature mean that the tests on Windows fail just because the bloody evens are not raise until the leave the q from the IO manager.

Read more
mandel

During the past few days I have been trying to track down an issue in the Ubuntu One client tests when ran on Windows that would use all the threads that the python process could have. As you can imaging finding out why there are deadlocks is quite hard, specially when I though that the code was thread safe, guess what? it wasn’t

The bug I had in the code was related to the way in which ReadDirectoryChangesW works. This functions has two different ways to be executed:

Synchronous

The ReadDirectoryChangesW can be executed in a sync mode by NOT providing a OVERLAPPED structure to perform the IO operations, for example:

def _watcherThread(self, dn, dh, changes):
        flags = win32con.FILE_NOTIFY_CHANGE_FILE_NAME
        while 1:
            try:
                print "waiting", dh
                changes = win32file.ReadDirectoryChangesW(dh,
                                                          8192,
                                                          False,
                                                          flags)
                print "got", changes
            except:
                raise
            changes.extend(changes)

The above example has the following two problems:

  • ReadDirectoryChangesW without an OVERLAPPED blocks infinitely.
  • If another thread attempts to close the handle while ReadDirectoryChangesW is waiting on it, the CloseHandle() method blocks (which has nothing to do with the GIL – it is correctly managed)

I got bitten in the ass by the second item which broke my tests in two different ways since it let thread block and a Handle used so that the rest of the tests could not remove the tmp directories that were under used by the block threads.

Asynchronous

In other to be able to use the async version of the function we just have to use an OVERLAPPED structure, this way the IO operations will no block and we will also be able to close the handle from a diff thread.

def _watcherThreadOverlapped(self, dn, dh, changes):
        flags = win32con.FILE_NOTIFY_CHANGE_FILE_NAME
        buf = win32file.AllocateReadBuffer(8192)
        overlapped = pywintypes.OVERLAPPED()
        overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None)
        while 1:
            win32file.ReadDirectoryChangesW(dh,
                                            buf,
                                            False, #sub-tree
                                            flags,
                                            overlapped)
            # Wait for our event, or for 5 seconds.
            rc = win32event.WaitForSingleObject(overlapped.hEvent, 5000)
            if rc == win32event.WAIT_OBJECT_0:
                # got some data!  Must use GetOverlappedResult to find out
                # how much is valid!  0 generally means the handle has
                # been closed.  Blocking is OK here, as the event has
                # already been set.
                nbytes = win32file.GetOverlappedResult(dh, overlapped, True)
                if nbytes:
                    bits = win32file.FILE_NOTIFY_INFORMATION(buf, nbytes)
                    changes.extend(bits)
                else:
                    # This is "normal" exit - our 'tearDown' closes the
                    # handle.
                    # print "looks like dir handle was closed!"
                    return
            else:
                print "ERROR: Watcher thread timed-out!"
                return # kill the thread!

Using the ReadDirectoryW function in this way does solve all the other issues that are found on the sync version and the only extra overhead added is that you need to understand how to deal with COM events which is not that hard after you have worked with it for a little.

I leave this here for people that might find the same issue and for me to remember how much my ass hurt.

References

Read more