44##
55"""
66Create and interface with PostgreSQL clusters.
7+
8+ Primarily, this means starting and stopping the postgres daemon and modifying
9+ the configuration file.
710"""
811import sys
912import os
13+ import errno
14+ import signal
15+ import time
16+ import io
1017import subprocess as sp
1118import warnings
19+ import tempfile
20+ from contextlib import closing
21+
1222from . import api as pg_api
1323from . import configfile
24+ from . import pg_config
25+ from . import exceptions as pg_exc
26+ # Used for connection ready check.
27+ from .driver import pgapi as pgapi
1428
1529DEFAULT_CONFIG_FILENAME = 'postgresql.conf'
1630DEFAULT_HBA_FILENAME = 'pg_hba.conf'
31+ DEFAULT_PID_FILENAME = 'postmaster.pid'
1732
1833initdb_option_map = {
1934 'encoding' : '-E' ,
2540 'time' : '--lc-time' ,
2641 'authentication' : '-A' ,
2742 'superusername' : '-U' ,
28- 'superuserpass' : '--pwfile' ,
29- 'verbose' : '-d' ,
30- 'sources' : '-L' ,
3143}
3244
3345class Cluster (pg_api .Cluster ):
3446 """
47+ Interface to a PostgreSQL cluster.
48+
49+ Provides mechanisms to start, stop, restart, kill, drop, and configure a
50+ cluster(data directory).
3551 """
52+ def get_pid_from_file (self ):
53+ """
54+ The current pid from the postmaster.pid file.
55+ """
56+ try :
57+ with closing (open (os .path .join (self .data_directory , DEFAULT_PID_FILENAME ))) as f :
58+ return int (f .readline ())
59+ except IOError as e :
60+ if e .errno in (errno .EIO , errno .ENOENT ):
61+ return None
62+
3663 @property
3764 def settings (self ):
38- if not hasattr (self , '_settings' ) :
65+ if getattr (self , '_settings' , None ) is None :
3966 self ._settings = configfile .ConfigFile (self .pgsql_dot_conf )
4067 return self ._settings
4168
42- @classmethod
43- def create (cls ,
44- path : "path to give to initdb" ,
45- initdb = 'initdb' ,
69+ @property
70+ def hba_file (self ):
71+ return self .settings .get (
72+ 'hba_file' ,
73+ os .path .join (self .data_directory , DEFAULT_HBA_FILENAME )
74+ )
75+
76+ def __init__ (self ,
77+ data_directory : "path to the data directory" ,
78+ pg_config_path : "path to pg_config to use" = 'pg_config' ,
79+ pg_config_data : "pg_config data to use; uses _path if None" = None
80+ ):
81+ self .data_directory = os .path .abspath (data_directory )
82+ self .pgsql_dot_conf = os .path .join (data_directory , DEFAULT_CONFIG_FILENAME )
83+ if pg_config_data is None :
84+ self .config = pg_config .dictionary (pg_config_path )
85+ else :
86+ self .config = pg_config_data
87+
88+ self .postgres_path = os .path .join (self .config ['bindir' ], 'postmaster' )
89+ if not os .path .exists (self .postgres_path ):
90+ self .postgres_path = os .path .join (self .config ['bindir' ], 'postgres' )
91+ self .daemon_process = None
92+ self .last_known_pid = self .get_pid_from_file ()
93+
94+ def __repr__ (self ):
95+ return "%s.%s(%r, %r)" % (
96+ type (self ).__module__ ,
97+ type (self ).__name__ ,
98+ self .data_directory ,
99+ self .postgres_path ,
100+ )
101+
102+ def init (self ,
103+ initdb : "explicitly state the initdb binary to use" = None ,
104+ verbose = False ,
105+ superuserpass = None ,
46106 ** kw
47107 ):
48108 """
49- Create the cluster at the given `datadir ` using the
109+ Create the cluster at the given `data_directory ` using the
50110 provided keyword parameters as options to the command.
51111
52112 `command_option_map` provides the mapping of keyword arguments
@@ -55,17 +115,24 @@ def create(cls,
55115 # Transform keyword options into command options for the executable.
56116 opts = []
57117 for x in kw :
58- if x in ('nowait' , ' logfile' , 'extra_arguments' ):
118+ if x in ('logfile' , 'extra_arguments' ):
59119 continue
60- if x not in self . command_option_map :
120+ if x not in initdb_option_map :
61121 raise TypeError ("got an unexpected keyword argument %r" % (x ,))
62- opts .append (self . command_option_map [x ])
122+ opts .append (initdb_option_map [x ])
63123 opts .append (kw [x ])
64- nowait = kw .get ('nowait' , False )
65124 logfile = kw .get ('logfile' , sp .PIPE )
66- extra_args = kw .get ('extra_arguments' , ())
125+ extra_args = tuple ([
126+ str (x ) for x in kw .get ('extra_arguments' , ())
127+ ])
128+ verbose = (initdb_option_map ['verbose' ],) if verbose is False else ()
129+ if superuserpass is not None :
130+ pass
67131
68- cmd = (self ._path , '-D' , datadir ) + tuple (opts ) + extra_args
132+ if initdb is None :
133+ initdb = os .path .join (self .config ['bindir' ], 'initdb' )
134+
135+ cmd = (initdb , '-D' , self .data_directory ) + verbose + tuple (opts ) + extra_args
69136 p = sp .Popen (
70137 cmd ,
71138 close_fds = True ,
@@ -75,48 +142,191 @@ def create(cls,
75142 )
76143 p .stdin .close ()
77144
78- if nowait :
79- return p
80-
81- try :
82- rc = p .wait ()
83- except KeyboardInterrupt :
84- os .kill (p .pid , signal .SIGINT )
85- raise
86-
145+ rc = p .wait ()
87146 if rc != 0 :
88- raise InitDBError (cmd , rc , p .stderr .read ())
147+ raise pg_exc . InitDBError (cmd , rc , p .stderr .read ())
89148
90- def __init__ (self , path , postgres_path = 'postgres' ):
149+ def drop (self ):
91150 """
151+ Stop the cluster and remove it from the filesystem
92152 """
93- if not os .path .isdir (path ):
94- raise ValueError ("cluster at %s does not exist" % (path ,))
95- self .path = path
96-
97- def __repr__ (self ):
98- return "%s.%s(%r, %r)" % (
99- type (self ).__module__ ,
100- type (self ).__name__ ,
101- self .path ,
102- self .postgres_path ,
103- )
104-
105- def drop (self ):
106- 'Stop the cluster and delete it from the filesystem'
107153 if self .running ():
108- self .stop ()
154+ self .kill ()
109155 # Really, using rm -rf would be the best, but use this for portability.
110- for root , dirs , files in os .walk (self .control . data , topdown = False ):
156+ for root , dirs , files in os .walk (self .data_directory , topdown = False ):
111157 for name in files :
112158 os .remove (os .path .join (root , name ))
113159 for name in dirs :
114160 os .rmdir (os .path .join (root , name ))
115- os .rmdir (self .control . data )
161+ os .rmdir (self .data_directory )
116162
117- def start (self ):
163+ def start (self ,
164+ logfile : "Where to send stderr" = sp .PIPE ,
165+ settings : "Mapping of runtime parameters" = None
166+ ):
118167 """
119168 Start the cluster
120169 """
170+ if self .running ():
171+ return None
172+ cmd = [self .postgres_path , '-D' , self .data_directory ]
173+ if settings is not None :
174+ for k ,v in settings :
175+ cmd .append ('--{k}={v}' .format (k = k ,v = v ))
176+
177+ p = sp .Popen (
178+ cmd ,
179+ close_fds = True ,
180+ stdout = logfile ,
181+ stderr = sp .STDOUT ,
182+ stdin = sp .PIPE ,
183+ )
184+ p .stdin .close ()
185+ self .last_known_pid = p .pid
186+ self .daemon_process = p
187+
188+ def stop (self ):
189+ """
190+ Stop the cluster gracefully(SIGTERM).
191+
192+ Does *not* wait for shutdown.
193+ """
194+ pid = self .get_pid_from_file ()
195+ if pid is not None :
196+ os .kill (pid , signal .SIGTERM )
197+
198+ def restart (self , timeout = 10 ):
199+ """
200+ Restart the cluster gracefully.
201+
202+ This provides a higher level interface to stopping then starting the
203+ cluster. It will
204+ """
205+ if self .running ():
206+ self .stop ()
207+ self .wait_until_stopped (timeout = timeout )
208+ if not self .running ():
209+ raise ClusterError ("failed to shutdown cluster" )
210+ self .start ()
211+ self .wait_until_started (timeout = timeout )
212+
213+ def kill (self ):
214+ """
215+ Stop the cluster immediately(SIGKILL).
216+
217+ Does *not* wait for shutdown.
218+ """
219+ pid = self .get_pid_from_file ()
220+ if pid is not None :
221+ os .kill (pid , signal .SIGKILL )
222+
223+ def initialized (self ):
224+ """
225+ Whether or not the data directory *appears* to be a valid cluster.
226+ """
227+ if os .path .isdir (self .data_directory ) and \
228+ os .path .exists (self .pgsql_dot_conf ) and \
229+ os .path .isdir (os .path .join (self .data_directory , 'base' )):
230+ return True
231+ return False
232+
233+ def running (self ):
234+ """
235+ Whether or not the postmaster is running.
236+
237+ This does *not* mean the cluster is accepting connections.
238+ """
239+ pid = self .get_pid_from_file ()
240+ if pid is None :
241+ return False
242+ return os .kill (pid , signal .SIG_DFL ) == 0
243+
244+ def ready_for_connections (self ):
245+ """
246+ If the daemon is running, and is not in startup mode.
247+
248+ This only works for clusters configured for TCP/IP connections.
249+ """
250+ if not self .running ():
251+ return False
252+ d = self .settings .getset ((
253+ 'listen_addresses' ,
254+ 'port' ,
255+ ))
256+ if 'listen_addresses' not in d :
257+ raise ClusterError (
258+ "postmaster pings can only be made to TCP/IP configurations"
259+ )
260+
261+ # Prefer localhost over other addresses.
262+ addrs = d ['listen_addresses' ].split (',' )
263+ if 'localhost' in addrs or '*' in addrs :
264+ host = 'localhost'
265+ elif '127.0.0.1' in addrs :
266+ host = '127.0.0.1'
267+ elif '::1' in addrs :
268+ host = '::1'
269+
270+ try :
271+ pg .connect (
272+ user = 'ping' ,
273+ host = host ,
274+ port = int (d .get ('port' ) or 5432 ),
275+ database = 'template1' ,
276+ ).close ()
277+ except pg_exc .CannotConnectNowError :
278+ return False
279+ except pg_exc .Error :
280+ return True
281+ except :
282+ return False
283+
284+ return True
285+
286+ def wait_until_started (self ,
287+ timeout : "how long to wait before throwing a timeout exception" = 10 ,
288+ delay : "how long to sleep before re-testing" = 0.1
289+ ):
290+ """
291+ After the `start` method is used, this can be ran in order to block until
292+ the cluster is ready for use.
293+ """
294+ start = time .time ()
295+ while True :
296+ if not self .running ():
297+ raise pg_exc .ClusterNotRunningError ("postres daemon has not been started" )
298+
299+ if self .ready_for_connections ():
300+ return
301+
302+ if time .time () - start >= timeout :
303+ raise pg_exc .ClusterTimeoutError (
304+ "start operation timed out: %d seconds elapsed" % (
305+ timeout
306+ )
307+ )
308+ time .sleep (delay )
309+
310+ def wait_until_stopped (self ,
311+ timeout : "how long to wait before throwing a timeout exception" = 10 ,
312+ delay : "how long to sleep before re-testing" = 0.1
313+ ):
314+ """
315+ After the `stop` method is used, this can be ran in order to block until
316+ the cluster is shutdown.
317+
318+ Additionally, catching `ClusterTimeoutError` exceptions would be a
319+ starting point for making decisions about whether or not to issue a kill
320+ to the postgres daemon.
321+ """
322+ start = time .time ()
323+ while self .running ():
324+ if time .time () - start >= timeout :
325+ raise pg_exc .ClusterTimeoutError (
326+ "stop operation timed out: %d seconds elapsed" % (
327+ timeout
328+ )
329+ )
330+ time .sleep (delay )
121331##
122332# vim: ts=3:sw=3:noet:
0 commit comments