Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Checkpoints-only feature and multiple checkpoints per file #8

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
HDFS Contents Manager for Jupyter Notebooks
===========================================

A contents manager for Jupyter that uses the Hadoop File System (HDFS) to store Notebooks and files
A contents manager for Jupyter that uses the Hadoop File System (HDFS) to store Notebooks and files. HDFSContents' HDFSContentsManager class can be used to replace all local filesystem storage with Hadoop's File System storage, while its HDFSCheckpoints class can be used to replace just the IPython's checkpoint storage.


Getting Started
Expand Down
9 changes: 9 additions & 0 deletions examples/example_hdfs_checkpoint_notebook_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from hdfscontents.hdfscheckpoints import HDFSCheckpoints

# Tell IPython to use HDFSCheckpoints for checkpoint storage.
c.ContentsManager.checkpoints_class = HDFSCheckpoints

c.HDFSCheckpoints.hdfs_namenode_host='<namenode_host_ip>'
c.HDFSCheckpoints.hdfs_namenode_port=8020
c.HDFSCheckpoints.root_dir='/user/hdfs_user'
c.HDFSCheckpoints.hdfs_user='hdfs'
9 changes: 9 additions & 0 deletions examples/example_hdfs_contents_notebook_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from hdfscontents.hdfsmanager import HDFSContentsManager

# Tell IPython to use HDFSContentsManager as ContentsManager .
c.NotebookApp.contents_manager_class = HDFSContentsManager

c.HDFSCheckpoints.hdfs_namenode_host='<namenode_host_ip>'
c.HDFSCheckpoints.hdfs_namenode_port=8020
c.HDFSCheckpoints.root_dir='/user/hdfs_user'
c.HDFSCheckpoints.hdfs_user='hdfs'
72 changes: 59 additions & 13 deletions hdfscontents/hdfscheckpoints.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
"""
HDFS-based Checkpoints implementations.
"""
import pdb
import os
from hdfscontents.hdfsio import HDFSManagerMixin
from tornado.web import HTTPError
from notebook.services.contents.checkpoints import Checkpoints
from pydoop.hdfs.fs import hdfs as HDFS
import pydoop.hdfs

from traitlets import Unicode
from traitlets import Unicode, Instance, Integer, default
try: # new notebook
from notebook import _tz as tz
except ImportError: # old notebook
Expand All @@ -30,22 +33,25 @@ class HDFSCheckpoints(HDFSManagerMixin, Checkpoints):
""",
)

hdfs = None
root_dire = None

# ContentsManager-dependent checkpoint API
def create_checkpoint(self, contents_mgr, path):
"""Create a checkpoint."""
checkpoint_id = u'checkpoint'
src_path = contents_mgr._get_hdfs_path(path)
if hasattr(contents_mgr,'_get_os_path'): #For checkpointing-only case
src_path = contents_mgr._get_os_path(path)
else:
src_path = contents_mgr._get_hdfs_path(path)
checkpoint_id = self.get_new_checkpoint_id(path)
dest_path = self.checkpoint_path(checkpoint_id, path)
self._copy(src_path, dest_path)
return self.checkpoint_model(checkpoint_id, dest_path)

def restore_checkpoint(self, contents_mgr, checkpoint_id, path):
"""Restore a checkpoint."""
src_path = self.checkpoint_path(checkpoint_id, path)
dest_path = contents_mgr._get_hdfs_path(path)
if hasattr(contents_mgr, '_get_os_path'): # For checkpointing-only case
dest_path = contents_mgr._get_os_path(path)
else:
dest_path = contents_mgr._get_hdfs_path(path)
self._copy(src_path, dest_path)

# ContentsManager-independent checkpoint API
Expand Down Expand Up @@ -78,12 +84,15 @@ def list_checkpoints(self, path):
This contents manager currently only supports one checkpoint per file.
"""
path = path.strip('/')
checkpoint_id = "checkpoint"
cp_path = self.checkpoint_path(checkpoint_id, path)
if not self._hdfs_file_exists(cp_path):
return []
else:
return [self.checkpoint_model(checkpoint_id, cp_path)]
checkpoint_id_list = self.get_checkpoints_list(path)
# TODO: Sort by 'LastModified' instead of checkpoint id
checkpoint_id_list.sort(reverse=True)
checkpoints_list = []
for checkpoint_id in checkpoint_id_list:
cp_path = self.checkpoint_path(checkpoint_id, path)
if self._hdfs_file_exists(cp_path):
checkpoints_list.append(self.checkpoint_model(checkpoint_id, cp_path))
return checkpoints_list

# Checkpoint-related utilities
def checkpoint_path(self, checkpoint_id, path):
Expand All @@ -104,6 +113,43 @@ def checkpoint_path(self, checkpoint_id, path):
cp_path = os.path.join(cp_dir, filename)
return cp_path

def get_checkpoints_list(self,path):
checkpoints_list = []
parent, name = ('/' + path).rsplit('/', 1)
parent = parent.strip('/')
hdfs_path = self._get_hdfs_path(path=parent)
full_checkpoint_dir = os.path.join(hdfs_path, self.checkpoint_dir)
for filepath_dict in self.hdfs.list_directory(full_checkpoint_dir):
filepath = filepath_dict['name']
filepath = filepath.strip('/')
parent, filename = ('/' + filepath).rsplit('/', 1)
basename, ext = os.path.splitext(filename)
filename, checkpoint_id = basename.rsplit('-', 1)
#print("checkpoint_id: ", checkpoint_id)
if checkpoint_id.isnumeric():
checkpoints_list.append(int(checkpoint_id))
#print("checkpoints list: ",checkpoints_list)
return checkpoints_list

def get_new_checkpoint_id(self,path):
"""return checkpoint_id for new checkpoint"""
path = path.strip('/')
parent, name = ('/' + path).rsplit('/', 1)
name,ext = os.path.splitext(name)
parent = parent.strip('/')
hdfs_path = self._get_hdfs_path(path=parent)
full_checkpoint_dir = os.path.join(hdfs_path,self.checkpoint_dir)+'/'
max_id = 0
for filepath_dict in self.hdfs.list_directory(full_checkpoint_dir):
filepath = filepath_dict['name']
filepath = filepath.strip('/')
parent, filename = ('/' + filepath).rsplit('/', 1)
basename, ext = os.path.splitext(filename)
filename,checkpoint_id = basename.rsplit('-',1)
if filename == name and checkpoint_id.isnumeric() and int(checkpoint_id) > max_id:
max_id = int(checkpoint_id)
return str(int(max_id) + 1)

def checkpoint_model(self, checkpoint_id, hdfs_path):
"""construct the info dict for a given checkpoint"""
stats = self.hdfs.get_path_info(hdfs_path)
Expand Down
59 changes: 43 additions & 16 deletions hdfscontents/hdfsio.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import nbformat

from pydoop.hdfs.path import split
from pydoop.hdfs.fs import hdfs as HDFS
from ipython_genutils.py3compat import str_to_unicode
from traitlets.config import Configurable
from traitlets import Bool, Integer, Unicode, default, Instance
Expand All @@ -41,14 +42,27 @@ def path_to_invalid(path):
def hdfs_copy_file(hdfs, src, dst):
chunk = 2 ** 16
# TODO: check if we need to specify replication
with hdfs.open_file(dst, 'w') as f1:
with hdfs.open_file(src, 'r') as f2:
while True:
out = f2.read(chunk)
if len(out) == 0:
break
f1.write(out)
hdfs.chmod(dst, 0o0770)

# TODO: Better way to support check-pointing only usecase
try:
with hdfs.open_file(dst, 'w') as f1:
with hdfs.open_file(src, 'r') as f2:
while True:
out = f2.read(chunk)
if len(out) == 0:
break
f1.write(out)
# When source file is not on HDFS
except:
with hdfs.open_file(dst, 'w') as f1:
with open(src, 'rb') as f2:
while True:
out = f2.read(chunk)
if len(out) == 0:
break
f1.write(out)

hdfs.chmod(dst, 0o0777)


def hdfs_replace_file(hdfs, src, dst):
Expand All @@ -57,7 +71,7 @@ def hdfs_replace_file(hdfs, src, dst):
"""
hdfs.delete(dst)
hdfs.move(src, hdfs, dst)
hdfs.chmod(dst, 0o0770)
hdfs.chmod(dst, 0o0777)


def hdfs_file_exists(hdfs, hdfs_path):
Expand Down Expand Up @@ -95,8 +109,8 @@ def atomic_writing(hdfs, hdfs_path):
# Flush to disk
fileobj.flush()
fileobj.close()
hdfs.chmod(hdfs_path, 0o0770)
hdfs.chmod(hdfs_path, 0o0777)

# Written successfully, now remove the backup copy
if hdfs_file_exists(hdfs, tmp_path):
hdfs.delete(tmp_path)
Expand Down Expand Up @@ -130,7 +144,7 @@ def _simple_writing(hdfs, hdfs_path):
# Flush to disk
fileobj.flush()
fileobj.close()
hdfs.chmod(hdfs_path, 0o0770)
hdfs.chmod(hdfs_path, 0o0777)


class HDFSManagerMixin(Configurable):
Expand All @@ -149,10 +163,23 @@ class HDFSManagerMixin(Configurable):
log : logging.Logger
"""

hdfs_namenode_host = Unicode(u'localhost', config=True, help='The HDFS namenode host')
hdfs_namenode_port = Integer(9000, config=True, help='The HDFS namenode port')
hdfs_user = Unicode(None, allow_none=True, config=True, help='The HDFS user name')

root_dir = Unicode(u'/', config=True, help='The HDFS root directory to use')

# The pydoop HDFS connection object used to interact with HDFS cluster.
hdfs = Instance(HDFS, config=True)

use_atomic_writing = Bool(True, config=True, help=
"""By default notebooks are saved on disk on a temporary file and then if succefully written, it replaces the old ones.
This procedure, namely 'atomic_writing', causes some bugs on file system whitout operation order enforcement (like some networked fs).
If set to False, the new notebook is written directly on the old one which could fail (eg: full filesystem or quota )""")
This procedure, namely 'atomic_writing', causes some bugs on file system whitout operation order enforcement (like some networked fs).
If set to False, the new notebook is written directly on the old one which could fail (eg: full filesystem or quota )""")

@default('hdfs')
def _default_hdfs(self):
return HDFS(host=self.hdfs_namenode_host, port=self.hdfs_namenode_port, user=self.hdfs_user) # groups=None

def _hdfs_dir_exists(self, hdfs_path):
"""Does the directory exists in HDFS filesystem?
Expand Down Expand Up @@ -181,7 +208,7 @@ def _hdfs_ensure_dir_exists(self, hdfs_path):
if not self.hdfs.exists(hdfs_path):
try:
self.hdfs.create_directory(hdfs_path)
self.hdfs.chmod(hdfs_path, 0o0770)
self.hdfs.chmod(hdfs_path, 0o0777)
except OSError as e:
if e.errno != errno.EEXIST:
raise
Expand Down Expand Up @@ -236,7 +263,7 @@ def _hdfs_move_file(self, src, dst):
self.hdfs.move(src, self.hdfs, dst)
# The pydoop move changes the permissions back to default!
for p in self.hdfs.walk(dst):
self.hdfs.chmod(p['name'], 0o0770)
self.hdfs.chmod(p['name'], 0o0777)

def _hdfs_copy_file(self, src, dst):
hdfs_copy_file(self.hdfs, src, dst)
Expand Down
35 changes: 18 additions & 17 deletions hdfscontents/hdfsmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from tornado.web import HTTPError
import mimetypes
import nbformat
from traitlets import Instance, Integer, Unicode, default
from traitlets import default

try: # PY3
from base64 import encodebytes, decodebytes
Expand All @@ -29,25 +29,26 @@ class HDFSContentsManager(ContentsManager, HDFSManagerMixin):
ContentsManager that persists to HDFS filesystem local filesystem.
"""

hdfs_namenode_host = Unicode(u'localhost', config=True, help='The HDFS namenode host')
hdfs_namenode_port = Integer(9000, config=True, help='The HDFS namenode port')
hdfs_user = Unicode(None, allow_none=True, config=True, help='The HDFS user name')

root_dir = Unicode(u'/', config=True, help='The HDFS root directory to use')

# The pydoop HDFS connection object used to interact with HDFS cluster.
hdfs = Instance(HDFS, config=True)

@default('hdfs')
def _default_hdfs(self):
return HDFS(host=self.hdfs_namenode_host, port=self.hdfs_namenode_port, user=self.hdfs_user) # groups=None

@default('checkpoints_class')
def _checkpoints_class_default(self):
# TODO: a better way to pass hdfs and root_dir?
HDFSCheckpoints.hdfs = self.hdfs
HDFSCheckpoints.root_dir = self.root_dir
return HDFSCheckpoints

@default('checkpoints_kwargs')
def _default_checkpoints_kwargs(self):
klass = HDFSContentsManager
try:
kw = super(klass, self)._checkpoints_kwargs_default()
except AttributeError:
kw = super(klass, self)._default_checkpoints_kwargs()

kw.update({
'hdfs_namenode_host': self.hdfs_namenode_host,
'hdfs_namenode_port': self.hdfs_namenode_port,
'hdfs_user': self.hdfs_user,
'root_dir': self.root_dir,
})
return kw

# ContentsManager API part 1: methods that must be
# implemented in subclasses.

Expand Down