platform_utils.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. #
  2. # Copyright (C) 2016 The Android Open Source Project
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import os
  16. import platform
  17. import select
  18. from Queue import Queue
  19. from threading import Thread
  20. def isWindows():
  21. """ Returns True when running with the native port of Python for Windows,
  22. False when running on any other platform (including the Cygwin port of
  23. Python).
  24. """
  25. # Note: The cygwin port of Python returns "CYGWIN_NT_xxx"
  26. return platform.system() == "Windows"
  27. class FileDescriptorStreams(object):
  28. """ Platform agnostic abstraction enabling non-blocking I/O over a
  29. collection of file descriptors. This abstraction is required because
  30. fctnl(os.O_NONBLOCK) is not supported on Windows.
  31. """
  32. @classmethod
  33. def create(cls):
  34. """ Factory method: instantiates the concrete class according to the
  35. current platform.
  36. """
  37. if isWindows():
  38. return _FileDescriptorStreamsThreads()
  39. else:
  40. return _FileDescriptorStreamsNonBlocking()
  41. def __init__(self):
  42. self.streams = []
  43. def add(self, fd, dest, std_name):
  44. """ Wraps an existing file descriptor as a stream.
  45. """
  46. self.streams.append(self._create_stream(fd, dest, std_name))
  47. def remove(self, stream):
  48. """ Removes a stream, when done with it.
  49. """
  50. self.streams.remove(stream)
  51. @property
  52. def is_done(self):
  53. """ Returns True when all streams have been processed.
  54. """
  55. return len(self.streams) == 0
  56. def select(self):
  57. """ Returns the set of streams that have data available to read.
  58. The returned streams each expose a read() and a close() method.
  59. When done with a stream, call the remove(stream) method.
  60. """
  61. raise NotImplementedError
  62. def _create_stream(fd, dest, std_name):
  63. """ Creates a new stream wrapping an existing file descriptor.
  64. """
  65. raise NotImplementedError
  66. class _FileDescriptorStreamsNonBlocking(FileDescriptorStreams):
  67. """ Implementation of FileDescriptorStreams for platforms that support
  68. non blocking I/O.
  69. """
  70. class Stream(object):
  71. """ Encapsulates a file descriptor """
  72. def __init__(self, fd, dest, std_name):
  73. self.fd = fd
  74. self.dest = dest
  75. self.std_name = std_name
  76. self.set_non_blocking()
  77. def set_non_blocking(self):
  78. import fcntl
  79. flags = fcntl.fcntl(self.fd, fcntl.F_GETFL)
  80. fcntl.fcntl(self.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
  81. def fileno(self):
  82. return self.fd.fileno()
  83. def read(self):
  84. return self.fd.read(4096)
  85. def close(self):
  86. self.fd.close()
  87. def _create_stream(self, fd, dest, std_name):
  88. return self.Stream(fd, dest, std_name)
  89. def select(self):
  90. ready_streams, _, _ = select.select(self.streams, [], [])
  91. return ready_streams
  92. class _FileDescriptorStreamsThreads(FileDescriptorStreams):
  93. """ Implementation of FileDescriptorStreams for platforms that don't support
  94. non blocking I/O. This implementation requires creating threads issuing
  95. blocking read operations on file descriptors.
  96. """
  97. def __init__(self):
  98. super(_FileDescriptorStreamsThreads, self).__init__()
  99. # The queue is shared accross all threads so we can simulate the
  100. # behavior of the select() function
  101. self.queue = Queue(10) # Limit incoming data from streams
  102. def _create_stream(self, fd, dest, std_name):
  103. return self.Stream(fd, dest, std_name, self.queue)
  104. def select(self):
  105. # Return only one stream at a time, as it is the most straighforward
  106. # thing to do and it is compatible with the select() function.
  107. item = self.queue.get()
  108. stream = item.stream
  109. stream.data = item.data
  110. return [stream]
  111. class QueueItem(object):
  112. """ Item put in the shared queue """
  113. def __init__(self, stream, data):
  114. self.stream = stream
  115. self.data = data
  116. class Stream(object):
  117. """ Encapsulates a file descriptor """
  118. def __init__(self, fd, dest, std_name, queue):
  119. self.fd = fd
  120. self.dest = dest
  121. self.std_name = std_name
  122. self.queue = queue
  123. self.data = None
  124. self.thread = Thread(target=self.read_to_queue)
  125. self.thread.daemon = True
  126. self.thread.start()
  127. def close(self):
  128. self.fd.close()
  129. def read(self):
  130. data = self.data
  131. self.data = None
  132. return data
  133. def read_to_queue(self):
  134. """ The thread function: reads everything from the file descriptor into
  135. the shared queue and terminates when reaching EOF.
  136. """
  137. for line in iter(self.fd.readline, b''):
  138. self.queue.put(_FileDescriptorStreamsThreads.QueueItem(self, line))
  139. self.fd.close()
  140. self.queue.put(_FileDescriptorStreamsThreads.QueueItem(self, None))