1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
| class Watch(object):
"""Implement the same functions as pyinotify.Watch."""
def __init__(self, watch_descriptor, path, mask, auto_add, processor,
buf_size=8192):
super(Watch, self).__init__()
self.log = logging.getLogger('ubuntuone.SyncDaemon.platform.windows.' +
'filesystem_notifications.Watch')
self.log.setLevel(TRACE)
self._processor = processor
self._buf_size = buf_size
self._wait_stop = CreateEvent(None, 0, 0, None)
self._overlapped = OVERLAPPED()
self._overlapped.hEvent = CreateEvent(None, 0, 0, None)
self._watching = False
self._descriptor = watch_descriptor
self._auto_add = auto_add
self._ignore_paths = []
self._cookie = None
self._source_pathname = None
self._process_thread = None
# remember the subdirs we have so that when we have a delete we can
# check if it was a remove
self._subdirs = []
# ensure that we work with an abspath and that we can deal with
# long paths over 260 chars.
if not path.endswith(os.path.sep):
path += os.path.sep
self._path = os.path.abspath(path)
self._mask = mask
# this deferred is fired when the watch has started monitoring
# a directory from a thread
self._watch_started_deferred = defer.Deferred()
@is_valid_windows_path(path_indexes=[1])
def _path_is_dir(self, path):
"""Check if the path is a dir and update the local subdir list."""
self.log.debug('Testing if path %r is a dir', path)
is_dir = False
if os.path.exists(path):
is_dir = os.path.isdir(path)
else:
self.log.debug('Path "%s" was deleted subdirs are %s.',
path, self._subdirs)
# we removed the path, we look in the internal list
if path in self._subdirs:
is_dir = True
self._subdirs.remove(path)
if is_dir:
self.log.debug('Adding %s to subdirs %s', path, self._subdirs)
self._subdirs.append(path)
return is_dir
def _process_events(self, events):
"""Process the events form the queue."""
# do not do it if we stop watching and the events are empty
if not self._watching:
return
# we transform the events to be the same as the one in pyinotify
# and then use the proc_fun
for action, file_name in events:
if any([file_name.startswith(path)
for path in self._ignore_paths]):
continue
# map the windows events to the pyinotify ones, tis is dirty but
# makes the multiplatform better, linux was first :P
syncdaemon_path = get_syncdaemon_valid_path(
os.path.join(self._path, file_name))
is_dir = self._path_is_dir(os.path.join(self._path, file_name))
if is_dir:
self._subdirs.append(file_name)
mask = WINDOWS_ACTIONS[action]
head, tail = os.path.split(file_name)
if is_dir:
mask |= IN_ISDIR
event_raw_data = {
'wd': self._descriptor,
'dir': is_dir,
'mask': mask,
'name': tail,
'path': '.'}
# by the way in which the win api fires the events we know for
# sure that no move events will be added in the wrong order, this
# is kind of hacky, I dont like it too much
if WINDOWS_ACTIONS[action] == IN_MOVED_FROM:
self._cookie = str(uuid4())
self._source_pathname = tail
event_raw_data['cookie'] = self._cookie
if WINDOWS_ACTIONS[action] == IN_MOVED_TO:
event_raw_data['src_pathname'] = self._source_pathname
event_raw_data['cookie'] = self._cookie
event = Event(event_raw_data)
# FIXME: event deduces the pathname wrong and we need to manually
# set it
event.pathname = syncdaemon_path
# add the event only if we do not have an exclude filter or
# the exclude filter returns False, that is, the event will not
# be excluded
self.log.debug('Event is %s.', event)
self._processor(event)
def _call_deferred(self, f, *args):
"""Executes the defeered call avoiding possible race conditions."""
if not self._watch_started_deferred.called:
f(args)
def _watch(self):
"""Watch a path that is a directory."""
# we are going to be using the ReadDirectoryChangesW whihc requires
# a directory handle and the mask to be used.
handle = CreateFile(
self._path,
FILE_LIST_DIRECTORY,
FILE_SHARE_READ | FILE_SHARE_WRITE,
None,
OPEN_EXISTING,
FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED,
None)
self.log.debug('Watching path %s.', self._path)
while True:
# important information to know about the parameters:
# param 1: the handle to the dir
# param 2: the size to be used in the kernel to store events
# that might be lost while the call is being performed. This
# is complicated to fine tune since if you make lots of watcher
# you migh used too much memory and make your OS to BSOD
buf = AllocateReadBuffer(self._buf_size)
try:
ReadDirectoryChangesW(
handle,
buf,
self._auto_add,
self._mask,
self._overlapped,
)
reactor.callFromThread(self._call_deferred,
self._watch_started_deferred.callback, True)
except error:
# the handle is invalid, this may occur if we decided to
# stop watching before we go in the loop, lets get out of it
reactor.callFromThread(self._call_deferred,
self._watch_started_deferred.errback, error)
break
# wait for an event and ensure that we either stop or read the
# data
rc = WaitForMultipleObjects((self._wait_stop,
self._overlapped.hEvent),
0, INFINITE)
if rc == WAIT_OBJECT_0:
# Stop event
break
# if we continue, it means that we got some data, lets read it
data = GetOverlappedResult(handle, self._overlapped, True)
# lets ead the data and store it in the results
events = FILE_NOTIFY_INFORMATION(buf, data)
self.log.debug('Events from ReadDirectoryChangesW are %s', events)
reactor.callFromThread(self._process_events, events)
CloseHandle(handle)
@is_valid_windows_path(path_indexes=[1])
def ignore_path(self, path):
"""Add the path of the events to ignore."""
if not path.endswith(os.path.sep):
path += os.path.sep
if path.startswith(self._path):
path = path[len(self._path):]
self._ignore_paths.append(path)
@is_valid_windows_path(path_indexes=[1])
def remove_ignored_path(self, path):
"""Reaccept path."""
if not path.endswith(os.path.sep):
path += os.path.sep
if path.startswith(self._path):
path = path[len(self._path):]
if path in self._ignore_paths:
self._ignore_paths.remove(path)
def start_watching(self):
"""Tell the watch to start processing events."""
for current_child in os.listdir(self._path):
full_child_path = os.path.join(self._path, current_child)
if os.path.isdir(full_child_path):
self._subdirs.append(full_child_path)
# start to diff threads, one to watch the path, the other to
# process the events.
self.log.debug('Start watching path.')
self._watching = True
reactor.callInThread(self._watch)
return self._watch_started_deferred
def stop_watching(self):
"""Tell the watch to stop processing events."""
self.log.info('Stop watching %s', self._path)
SetEvent(self._wait_stop)
self._watching = False
self._subdirs = []
def update(self, mask, auto_add=False):
"""Update the info used by the watcher."""
self.log.debug('update(%s, %s)', mask, auto_add)
self._mask = mask
self._auto_add = auto_add
@property
def path(self):
"""Return the patch watched."""
return self._path
@property
def auto_add(self):
return self._auto_add |