Skip to content

liveserver

Class info

Classes

Name Children Inherits
LiveServer
mkdocs_mknodes.liveserver
    _Handler
    mkdocs_mknodes.liveserver

      🛈 DocStrings

      LiveServer

      Bases: ThreadingMixIn, WSGIServer

      Source code in mkdocs_mknodes/liveserver.py
       88
       89
       90
       91
       92
       93
       94
       95
       96
       97
       98
       99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      122
      123
      124
      125
      126
      127
      128
      129
      130
      131
      132
      133
      134
      135
      136
      137
      138
      139
      140
      141
      142
      143
      144
      145
      146
      147
      148
      149
      150
      151
      152
      153
      154
      155
      156
      157
      158
      159
      160
      161
      162
      163
      164
      165
      166
      167
      168
      169
      170
      171
      172
      173
      174
      175
      176
      177
      178
      179
      180
      181
      182
      183
      184
      185
      186
      187
      188
      189
      190
      191
      192
      193
      194
      195
      196
      197
      198
      199
      200
      201
      202
      203
      204
      205
      206
      207
      208
      209
      210
      211
      212
      213
      214
      215
      216
      217
      218
      219
      220
      221
      222
      223
      224
      225
      226
      227
      228
      229
      230
      231
      232
      233
      234
      235
      236
      237
      238
      239
      240
      241
      242
      243
      244
      245
      246
      247
      248
      249
      250
      251
      252
      253
      254
      255
      256
      257
      258
      259
      260
      261
      262
      263
      264
      265
      266
      267
      268
      269
      270
      271
      272
      273
      274
      275
      276
      277
      278
      279
      280
      281
      282
      283
      284
      285
      286
      287
      288
      289
      290
      291
      292
      293
      294
      295
      296
      297
      298
      299
      300
      301
      302
      303
      304
      305
      306
      307
      308
      309
      310
      311
      312
      313
      314
      315
      316
      317
      318
      319
      320
      321
      322
      323
      324
      325
      326
      327
      328
      329
      330
      331
      332
      333
      334
      335
      336
      337
      338
      339
      340
      341
      342
      class LiveServer(socketserver.ThreadingMixIn, wsgiref.simple_server.WSGIServer):
          daemon_threads = True
          poll_response_timeout = 60
      
          def __init__(
              self,
              builder: Callable[[], None],
              host: str,
              port: int,
              root: str | os.PathLike[str],
              mount_path: str = "/",
              polling_interval: float = 0.5,
              shutdown_delay: float = 0.25,
          ) -> None:
              self.builder = builder
              with contextlib.suppress(Exception):
                  if isinstance(ipaddress.ip_address(host), ipaddress.IPv6Address):
                      self.address_family = socket.AF_INET6
              self.root = upath.UPath(root).resolve()
              self.url = _serve_url(host, port, mount_path)
              self.build_delay = 0.1
              self.shutdown_delay = shutdown_delay
      
              super().__init__((host, port), _Handler, bind_and_activate=False)
              self.set_app(self.serve_request)
      
              self._wanted_epoch = _timestamp()
              """Version of the site that started building."""
              self._visible_epoch = self._wanted_epoch
              """Latest fully built version of the site."""
              self._epoch_cond = threading.Condition()
              """Hold this lock when accessing _visible_epoch."""
              self._want_rebuild: bool = False
              self._rebuild_cond = threading.Condition()
              """Hold this lock when accessing _want_rebuild."""
              self._shutdown = False
              self.serve_thread = threading.Thread(
                  target=lambda: self.serve_forever(shutdown_delay)
              )
              self.observer = watchdog.observers.polling.PollingObserver(
                  timeout=polling_interval
              )
      
              self._watched_paths: dict[str, int] = {}
              self._watch_refs: dict[str, Any] = {}
      
          def error_handler(self, code: int) -> bytes | None:
              if code not in (404, 500):
                  return None
              error_page = self.root / f"{code}.html"
              return error_page.read_bytes() if error_page.is_file() else None
      
          def watch(self, path: str | os.PathLike[str], *, recursive: bool = True) -> None:
              """Add the 'path' to watched paths.
      
              Args:
                  path: The path to watch.
                  recursive: Whether to watch the path recursively
              """
              path = str(upath.UPath(path).resolve())
              if path in self._watched_paths:
                  self._watched_paths[path] += 1
                  return
              self._watched_paths[path] = 1
      
              def callback(event: watchdog.events.FileSystemEvent):
                  if event.is_directory:
                      return
                  logger.debug(str(event))
                  with self._rebuild_cond:
                      self._want_rebuild = True
                      self._rebuild_cond.notify_all()
      
              handler = watchdog.events.FileSystemEventHandler()
              handler.on_any_event = callback  # type: ignore[method-assign]
              logger.debug("Watching %r", path)
              self._watch_refs[path] = self.observer.schedule(
                  handler, path, recursive=recursive
              )
      
          def unwatch(self, path: str | os.PathLike[str]) -> None:
              """Stop watching file changes for path.
      
              Raises if there was no corresponding `watch` call.
      
              Args:
                  path: The path to unwatch
              """
              path = str(upath.UPath(path).resolve())
      
              self._watched_paths[path] -= 1
              if self._watched_paths[path] <= 0:
                  self._watched_paths.pop(path)
                  self.observer.unschedule(self._watch_refs.pop(path))
      
          def serve(self, *, open_in_browser: bool = False):
              self.server_bind()
              self.server_activate()
      
              if self._watched_paths:
                  self.observer.start()
                  paths = (f"'{_try_relativize_path(path)}'" for path in self._watched_paths)
                  paths_str = ", ".join(paths)
                  logger.info("Watching paths for changes: %s", paths_str)
      
              if open_in_browser:
                  logger.info("Serving on %s and opening it in a browser", self.url)
              else:
                  logger.info("Serving on %s", self.url)
              self.serve_thread.start()
              if open_in_browser:
                  webbrowser.open(str(self.url))
      
              self._build_loop()
      
          def _build_loop(self):
              while True:
                  with self._rebuild_cond:
                      while not self._rebuild_cond.wait_for(
                          lambda: self._want_rebuild or self._shutdown,
                          timeout=self.shutdown_delay,
                      ):  # We use loop + timeout instead of one wait since we need occasional
                          # breaks on Windows to recive KeyboardInterrupt
                          pass
                      if self._shutdown:
                          break
                      logger.info("Detected file changes")
                      while self._rebuild_cond.wait(timeout=self.build_delay):
                          logger.debug("Waiting for file changes to stop happening")
      
                      self._wanted_epoch = _timestamp()
                      self._want_rebuild = False
      
                  try:
                      self.builder()
                  except Exception as e:
                      if isinstance(e, SystemExit):
                          print(e, file=sys.stderr)
                          print(e, file=sys.stderr)
                      else:
                          traceback.print_exc()
                      logger.exception(
                          "An error happened during the rebuild."
                          "The server will appear stuck until build errors are resolved."
                      )
                      continue
      
                  with self._epoch_cond:
                      logger.info("Reloading browsers")
                      self._visible_epoch = self._wanted_epoch
                      self._epoch_cond.notify_all()
      
          def shutdown(self, wait: bool = False) -> None:
              self.observer.stop()
              with self._rebuild_cond:
                  self._shutdown = True
                  self._rebuild_cond.notify_all()
      
              if self.serve_thread.is_alive():
                  super().shutdown()
              self.server_close()
              if wait:
                  self.serve_thread.join()
                  self.observer.join()
      
          def serve_request(
              self, environ: dict[str, str], start_response: Callable[..., Any]
          ) -> Iterable[bytes]:
              try:
                  result = self._serve_request(environ, start_response)
              except Exception:
                  code = 500
                  msg = "500 Internal Server Error"
                  logger.exception(msg)
              else:
                  if result is not None:
                      return result
                  code = 404
                  msg = "404 Not Found"
      
              error_content = None
              try:
                  error_content = self.error_handler(code)
              except Exception:
                  logger.exception("Failed to render an error message!")
              if error_content is None:
                  error_content = msg.encode()
      
              start_response(msg, [("Content-Type", "text/html")])
              return [error_content]
      
          def _serve_request(
              self, environ: dict[str, str], start_response: Callable[..., Any]
          ) -> Iterable[bytes] | None:
              # https://bugs.python.org/issue16679
              # https://github.com/bottlepy/bottle/blob/f9b1849db4/bottle.py#L984
              path = environ["PATH_INFO"].encode("latin-1").decode("utf-8", "ignore")
              if m := re.fullmatch(r"/livereload/([0-9]+)/[0-9]+", path):
                  epoch = int(m[1])
                  start_response("200 OK", [("Content-Type", "text/plain")])
      
                  def condition():
                      return self._visible_epoch > epoch
      
                  with self._epoch_cond:
                      if not condition():
                          # Stall the browser, respond as soon as there's something new.
                          # If there's not, respond anyway after a minute.
                          _log_poll_request(environ.get("HTTP_REFERER"), request_id=path)
                          self._epoch_cond.wait_for(
                              condition, timeout=self.poll_response_timeout
                          )
                      return [b"%d" % self._visible_epoch]
      
              if (path + "/").startswith(str(self.url.path)):
                  rel_file_path = path[len(str(self.url.path)) :]
      
                  if path.endswith("/"):
                      rel_file_path += "index.html"
                  # Prevent directory traversal - normalize the path.
                  rel_file_path = posixpath.normpath("/" + rel_file_path).lstrip("/")
                  file_path = self.root / rel_file_path
              elif path == "/":
                  start_response("302 Found", [("Location", str(self.url.path))])
                  return []
              else:
                  return None  # Not found
      
              # Wait until the ongoing rebuild (if any) finishes to not serve a half-built site.
              with self._epoch_cond:
                  self._epoch_cond.wait_for(lambda: self._visible_epoch == self._wanted_epoch)
                  epoch = self._visible_epoch
      
              try:
                  file = file_path.open("rb")
              except OSError:
                  if not path.endswith("/") and (file_path / "index.html").is_file():
                      start_response("302 Found", [("Location", str(URL(path).path) + "/")])
                      return []
                  return None  # Not found
      
              if self._watched_paths and file_path.suffix == ".html":
                  with file:
                      content = file.read()
                  script = _SCRIPT_TEMPLATE.substitute(epoch=epoch, request_id=_timestamp())
                  content = htmlfilters.inject_javascript(content, script)
                  file = io.BytesIO(content)
                  content_len = len(content)
              else:
                  content_len = file_path.stat().st_size
      
              content_type = _guess_type(file_path)
              headers = [("Content-Type", content_type), ("Content-Length", str(content_len))]
              start_response("200 OK", headers)
              return wsgiref.util.FileWrapper(file)
      

      unwatch

      unwatch(path: str | PathLike[str]) -> None
      

      Stop watching file changes for path.

      Raises if there was no corresponding watch call.

      Parameters:

      Name Type Description Default
      path str | PathLike[str]

      The path to unwatch

      required
      Source code in mkdocs_mknodes/liveserver.py
      168
      169
      170
      171
      172
      173
      174
      175
      176
      177
      178
      179
      180
      181
      def unwatch(self, path: str | os.PathLike[str]) -> None:
          """Stop watching file changes for path.
      
          Raises if there was no corresponding `watch` call.
      
          Args:
              path: The path to unwatch
          """
          path = str(upath.UPath(path).resolve())
      
          self._watched_paths[path] -= 1
          if self._watched_paths[path] <= 0:
              self._watched_paths.pop(path)
              self.observer.unschedule(self._watch_refs.pop(path))
      

      watch

      watch(path: str | PathLike[str], *, recursive: bool = True) -> None
      

      Add the 'path' to watched paths.

      Parameters:

      Name Type Description Default
      path str | PathLike[str]

      The path to watch.

      required
      recursive bool

      Whether to watch the path recursively

      True
      Source code in mkdocs_mknodes/liveserver.py
      140
      141
      142
      143
      144
      145
      146
      147
      148
      149
      150
      151
      152
      153
      154
      155
      156
      157
      158
      159
      160
      161
      162
      163
      164
      165
      166
      def watch(self, path: str | os.PathLike[str], *, recursive: bool = True) -> None:
          """Add the 'path' to watched paths.
      
          Args:
              path: The path to watch.
              recursive: Whether to watch the path recursively
          """
          path = str(upath.UPath(path).resolve())
          if path in self._watched_paths:
              self._watched_paths[path] += 1
              return
          self._watched_paths[path] = 1
      
          def callback(event: watchdog.events.FileSystemEvent):
              if event.is_directory:
                  return
              logger.debug(str(event))
              with self._rebuild_cond:
                  self._want_rebuild = True
                  self._rebuild_cond.notify_all()
      
          handler = watchdog.events.FileSystemEventHandler()
          handler.on_any_event = callback  # type: ignore[method-assign]
          logger.debug("Watching %r", path)
          self._watch_refs[path] = self.observer.schedule(
              handler, path, recursive=recursive
          )