#!/usr/bin/python # The MIT License # # Copyright (c) 2012 Radu Cristescu # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. import struct import socket import sys import os import stat def getCstr(data, pos): end = data.find("\0", pos) if end == -1: return None, None string = data[pos:end] return string, pos + len(string) + 1 def fullPath(cwd, path): result = os.path.normpath(cwd + "/" + path) if path[0] != "/" else path ## http://stackoverflow.com/questions/7816818/why-doesnt-os-normapath-collapse-a-leading-double-slash ## It doesn't hurt having a double slash, but it looks ugly and inconsistent, so we clean it up if result[:2] == "//": result = result[1:] return result ## This appears to be the only TNFS thing that doesn't match Linux. I wonder why... class tnfs_flag(object): O_RDONLY = 0x0001 O_WRONLY = 0x0002 O_RDWR = 0x0003 O_APPEND = 0x0008 O_CREAT = 0x0100 O_TRUNC = 0x0200 O_EXCL = 0x0400 def flagsToTNFS(flags): tnfs_flags = 0 if flags & 0x03 == os.O_RDONLY: tnfs_flags |= tnfs_flag.O_RDONLY elif flags & 0x03 == os.O_WRONLY: tnfs_flags |= tnfs_flag.O_WRONLY elif flags & 0x03 == os.O_RDWR: tnfs_flags |= tnfs_flag.O_RDWR if flags & os.O_APPEND: tnfs_flags |= tnfs_flag.O_APPEND if flags & os.O_CREAT: tnfs_flags |= tnfs_flag.O_CREAT if flags & os.O_EXCL: tnfs_flags |= tnfs_flag.O_EXCL if flags & os.O_TRUNC: tnfs_flags |= tnfs_flag.O_TRUNC return tnfs_flags class MessageBase(object): TnfsCmd = None def __init__(self): self.setSession(None).setRetry(0).setCommand(self.TnfsCmd) def setSession(self, conn_id): self.conn_id = conn_id return self def setRetry(self, retry): self.retry = retry return self def setCommand(self, command): self.command = command return self def toWire(self): return struct.pack(" 22: pos = 22 user, pos = getCstr(data, pos) group, pos = getCstr(data, pos) else: user = "anonymous" group = "anonymous" else: mode = uid = gid = size = atime = mtime = ctime = None user = "anonymous" group = "anonymous" self.setMode(mode).setUID(uid).setGID(gid).setSize(size).setAtime(atime).setMtime(mtime).setCtime(ctime).setUser(user).setGroup(group) class LSeek(Command): TnfsCmd = 0x25 def __init__(self): Command.__init__(self) self.setFD(None).setSeekType(None).setSeekPosition(None) def setFD(self, fd): self.fd = fd return self def setSeekType(self, seektype): self.seektype = seektype return self def setSeekPosition(self, position): self.seekposition = position return self def do_DataToWire(self): return struct.pack(" 0: data = self._SendReceive(Read().setFD(fd).setSize(size if size <= 512 else 512)) r = ReadResponse().fromWire(data) if r.reply == 0: data_received.append(r.data) size -= len(r.data) else: break data_received = "".join(data_received) if (len(data_received) > 0): return 0, "".join(data_received) else: return r.reply, None def Write(self, fd, data_to_send): written = 0 while written < len(data_to_send): data = self._SendReceive(Write().setFD(fd).setData(data_to_send[written:written+512])) r = WriteResponse().fromWire(data) if r.reply != 0: break written += r.size return r.reply, written def Close(self, fd): data = self._SendReceive(Close().setFD(fd)) r = CloseResponse().fromWire(data) return r.reply def Stat(self, path): data = self._SendReceive(Stat().setPath(path)) r = StatResponse().fromWire(data) return r.reply, r def LSeek(self, fd, offset, whence): data = self._SendReceive(LSeek().setFD(fd).setSeekPosition(offset).setSeekType(whence)) r = LSeekResponse().fromWire(data) return r.reply def Unlink(self, path): data = self._SendReceive(Unlink().setPath(path)) r = UnlinkResponse().fromWire(data) return r.reply def Rename(self, source, destination): data = self._SendReceive(Rename().setSourcePath(source).setDestinationPath(destination)) r = RenameResponse().fromWire(data) return r.reply def ChMod(self, path, mode): data = self._SendReceive(ChMod().setPath(path).setMode(mode)) r = ChModResponse().fromWire(data) return r.reply def GetFilesystemSize(self): data = self._SendReceive(Size()) r = SizeResponse().fromWire(data) return r.reply, r.size def GetFilesystemFree(self): data = self._SendReceive(Free()) r = FreeResponse().fromWire(data) return r.reply, r.free #----------------------------------------------# def ListDir(self, path): contents = [] reply, handle = self.OpenDir(path) while reply == 0: reply, filename = self.ReadDir(handle) if reply == 0: contents.append(filename) if handle is not None: self.CloseDir(handle) return contents def GetFile(self, path): data = [] reply, fd = self.Open(path) if fd is None: return None while reply == 0: reply, chunk = self.Read(fd, 4096) if reply == 0: data.append(chunk) self.Close(fd) return "".join(data) def PutFile(self, path, data): reply, fd = self.Open(path, tnfs_flag.O_WRONLY | tnfs_flag.O_CREAT | tnfs_flag.O_TRUNC, 0600) if fd is None: print "Access denied" return pos = 0 while pos < len(data): self.Write(fd, data[pos:pos + 4096]) pos += 4096 self.Close(fd) if __name__ == "__main__": #RunTests() address = (sys.argv[1] if len(sys.argv) > 1 else 'vexed4.alioth.net', int(sys.argv[2]) if len(sys.argv) > 2 else 16384) print "Connecting to %s:%d..." % address command = ["ls"] cwd = "/" with Session(address) as S: print "Remote server is version", S.version while True: if len(command) == 0: pass elif command[0] == "quit": print "Bye!" break elif command[0] == "ls" or command[0] == "dir": long_listing = False if len(command) > 1 and command[1] == "-l": command.pop(1) long_listing = True path = os.path.normpath(cwd[1:] + "/" + command[1] if len(command) > 1 else cwd) files = sorted(S.ListDir(path)) _, size = S.GetFilesystemSize() _, free = S.GetFilesystemFree() listing = [] if not long_listing: for filename in files: listing.append(filename) else: listing_format = "{0:^15s} {1:0>5o} {2:>15d} {3:>5d} {4:>5d} {5}" listing_header = "{0:^15s} {1: ^5s} {2:^15s} {3:>5s} {4:>5s} {5}".format("TYPE", "PERM", "SIZE", "USER", "GROUP", "NAME") listing.append(listing_header) for filename in files: _, filestat = S.Stat(fullPath(path, filename)) if stat.S_ISREG(filestat.mode): filetype = "file" elif stat.S_ISDIR(filestat.mode): filetype = "directory" else: filetype = "other" details = listing_format.format(filetype, filestat.mode & 07777, filestat.size, filestat.uid, filestat.gid, filename) listing.append(details) print "Contents of %s:" % path for entry in listing: print " " + entry if size is not None: print "Size: %d KB" % size if free is not None: print "Free: %d KB" % free elif command[0] == "cd": if len(command) == 2: path = command[1] cwd = fullPath(cwd, path) else: print "Syntax: cd " elif command[0] == "pwd": print cwd elif command[0] == "mkdir": if len(command) == 2: path = fullPath(cwd, command[1]) S.MkDir(path) else: print "Syntax: mkdir " elif command[0] == "rmdir": if len(command) == 2: path = fullPath(cwd, command[1]) S.RmDir(path) else: print "Syntax: rmdir " elif command[0] == "get": if len(command) in (2, 3): print "Downloading '%s'" % command[1] source = fullPath(cwd, command[1]) destination = command[2] if len(command) == 3 else os.path.basename(source) data = S.GetFile(source) if data is not None: with open(destination, "w", 0600) as f: f.write(data) else: print "Download failed" else: print "Syntax: get []" elif command[0] == "put": if len(command) in (2, 3): print "Uploading '%s'" % command[1] source = command[1] destination = fullPath(cwd, (command[2] if len(command) == 3 else os.path.basename(source))) with open(source, "r") as f: data = f.read() S.PutFile(destination, data) else: print "Syntax: put []" else: print "Unknown command '%s'" % command try: command = raw_input(cwd + "> ").strip().split() except (EOFError, KeyboardInterrupt): print "quit" command = ["quit"]