Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

382

383

384

385

386

387

388

389

390

391

392

393

394

395

396

397

398

399

400

401

402

403

404

405

406

407

408

409

410

411

412

413

414

415

416

417

418

419

420

421

422

423

424

425

426

427

428

429

430

431

432

433

434

435

436

437

438

439

440

441

442

443

444

445

446

447

448

449

450

451

452

453

454

455

456

457

458

459

460

461

462

463

464

465

466

467

468

469

470

471

472

473

474

475

476

477

478

479

480

481

482

483

484

485

486

487

488

489

490

491

492

493

494

495

496

497

498

499

500

501

502

503

504

505

506

507

508

509

510

511

512

513

514

515

516

517

518

519

520

521

522

523

524

525

526

527

528

529

530

531

532

533

534

535

536

537

538

539

540

541

542

543

544

545

546

547

548

549

550

551

552

553

554

555

556

557

558

559

560

561

562

563

564

565

566

567

568

569

570

571

572

573

574

575

576

577

578

579

580

581

582

583

584

585

586

587

588

589

590

591

592

593

594

595

596

597

598

599

600

601

602

603

604

605

606

607

608

609

610

611

612

613

614

615

616

617

618

619

620

621

622

623

624

625

626

627

628

629

630

631

632

633

634

635

636

637

638

639

640

641

642

643

644

645

646

647

648

649

650

651

652

653

654

655

656

657

658

659

660

661

662

663

664

665

666

667

668

669

670

671

672

673

674

675

676

677

678

679

680

681

682

683

684

685

686

687

688

689

690

691

692

693

694

695

696

697

698

699

700

701

702

703

704

705

706

707

708

709

710

711

712

713

714

715

716

717

718

719

720

721

722

723

724

725

726

727

728

729

730

731

732

733

734

735

736

737

738

739

740

741

742

743

744

745

746

747

748

749

750

751

752

753

754

755

756

757

758

759

760

761

762

763

764

765

766

767

768

769

770

771

772

773

774

775

776

777

778

779

780

781

782

783

784

785

786

787

788

789

790

791

792

793

794

795

796

797

798

799

800

801

802

803

804

805

806

807

808

809

810

811

812

813

814

815

816

817

818

819

820

821

822

823

824

825

826

827

828

829

830

831

832

833

834

835

836

837

838

839

840

841

842

843

844

845

846

847

848

849

850

851

852

853

854

855

856

857

858

859

860

861

862

863

864

865

866

867

868

869

870

871

872

873

874

875

876

877

878

879

880

881

882

883

884

885

886

887

888

889

890

891

892

893

894

895

896

897

898

899

900

901

902

903

904

905

906

907

908

909

910

911

912

913

914

915

916

917

918

919

920

921

922

923

924

925

926

927

928

929

930

931

932

933

934

935

936

937

938

939

940

941

942

943

944

945

946

947

948

949

950

951

952

953

954

955

956

957

958

959

960

961

962

963

964

965

966

967

968

969

970

971

972

973

974

975

976

977

978

979

980

981

982

983

984

985

986

987

988

989

990

991

992

993

994

995

996

997

998

999

1000

1001

1002

1003

1004

1005

1006

1007

1008

1009

1010

1011

1012

1013

1014

1015

1016

1017

1018

1019

1020

1021

1022

1023

1024

1025

1026

1027

1028

1029

1030

1031

1032

1033

1034

1035

1036

1037

1038

1039

1040

1041

1042

1043

1044

1045

1046

1047

1048

1049

1050

1051

1052

1053

1054

1055

1056

1057

1058

1059

1060

1061

1062

1063

1064

1065

1066

1067

1068

1069

1070

1071

1072

1073

1074

1075

1076

1077

1078

1079

1080

1081

1082

1083

1084

1085

1086

1087

1088

1089

1090

1091

1092

1093

1094

1095

1096

1097

1098

1099

1100

1101

1102

1103

1104

1105

1106

1107

1108

1109

1110

1111

1112

1113

1114

1115

1116

1117

1118

1119

1120

1121

1122

1123

1124

1125

1126

1127

1128

1129

1130

1131

1132

1133

1134

1135

1136

1137

1138

1139

1140

1141

1142

1143

1144

1145

1146

1147

1148

1149

1150

1151

1152

1153

1154

1155

1156

1157

1158

1159

1160

1161

1162

1163

1164

1165

1166

1167

1168

1169

1170

1171

1172

1173

1174

1175

1176

1177

1178

1179

1180

1181

1182

1183

1184

1185

1186

1187

1188

1189

1190

1191

1192

1193

1194

1195

1196

1197

1198

1199

1200

1201

1202

1203

1204

1205

1206

1207

1208

1209

1210

1211

1212

1213

1214

1215

1216

1217

1218

1219

1220

1221

1222

1223

1224

1225

1226

1227

1228

1229

1230

1231

1232

1233

1234

1235

1236

1237

1238

1239

1240

1241

1242

1243

1244

1245

1246

1247

1248

1249

1250

1251

1252

1253

1254

1255

1256

1257

1258

1259

1260

1261

1262

1263

1264

1265

1266

1267

1268

# Natural Language Toolkit: Hidden Markov Model 

# 

# Copyright (C) 2001-2012 NLTK Project 

# Author: Trevor Cohn <tacohn@csse.unimelb.edu.au> 

#         Philip Blunsom <pcbl@csse.unimelb.edu.au> 

#         Tiago Tresoldi <tiago@tresoldi.pro.br> (fixes) 

#         Steven Bird <sb@csse.unimelb.edu.au> (fixes) 

#         Joseph Frazee <jfrazee@mail.utexas.edu> (fixes) 

# URL: <http://www.nltk.org/> 

# For license information, see LICENSE.TXT 

 

""" 

Hidden Markov Models (HMMs) largely used to assign the correct label sequence 

to sequential data or assess the probability of a given label and data 

sequence. These models are finite state machines characterised by a number of 

states, transitions between these states, and output symbols emitted while in 

each state. The HMM is an extension to the Markov chain, where each state 

corresponds deterministically to a given event. In the HMM the observation is 

a probabilistic function of the state. HMMs share the Markov chain's 

assumption, being that the probability of transition from one state to another 

only depends on the current state - i.e. the series of states that led to the 

current state are not used. They are also time invariant. 

 

The HMM is a directed graph, with probability weighted edges (representing the 

probability of a transition between the source and sink states) where each 

vertex emits an output symbol when entered. The symbol (or observation) is 

non-deterministically generated. For this reason, knowing that a sequence of 

output observations was generated by a given HMM does not mean that the 

corresponding sequence of states (and what the current state is) is known. 

This is the 'hidden' in the hidden markov model. 

 

Formally, a HMM can be characterised by: 

 

- the output observation alphabet. This is the set of symbols which may be 

  observed as output of the system. 

- the set of states. 

- the transition probabilities *a_{ij} = P(s_t = j | s_{t-1} = i)*. These 

  represent the probability of transition to each state from a given state. 

- the output probability matrix *b_i(k) = P(X_t = o_k | s_t = i)*. These 

  represent the probability of observing each symbol in a given state. 

- the initial state distribution. This gives the probability of starting 

  in each state. 

 

To ground this discussion, take a common NLP application, part-of-speech (POS) 

tagging. An HMM is desirable for this task as the highest probability tag 

sequence can be calculated for a given sequence of word forms. This differs 

from other tagging techniques which often tag each word individually, seeking 

to optimise each individual tagging greedily without regard to the optimal 

combination of tags for a larger unit, such as a sentence. The HMM does this 

with the Viterbi algorithm, which efficiently computes the optimal path 

through the graph given the sequence of words forms. 

 

In POS tagging the states usually have a 1:1 correspondence with the tag 

alphabet - i.e. each state represents a single tag. The output observation 

alphabet is the set of word forms (the lexicon), and the remaining three 

parameters are derived by a training regime. With this information the 

probability of a given sentence can be easily derived, by simply summing the 

probability of each distinct path through the model. Similarly, the highest 

probability tagging sequence can be derived with the Viterbi algorithm, 

yielding a state sequence which can be mapped into a tag sequence. 

 

This discussion assumes that the HMM has been trained. This is probably the 

most difficult task with the model, and requires either MLE estimates of the 

parameters or unsupervised learning using the Baum-Welch algorithm, a variant 

of EM. 

 

For more information, please consult the source code for this module, 

which includes extensive demonstration code. 

""" 

from __future__ import print_function 

 

import re 

import types 

from numpy import zeros, ones, float32, float64, log2, hstack, array, argmax 

 

from nltk.probability import (FreqDist, ConditionalFreqDist, 

                              ConditionalProbDist, DictionaryProbDist, 

                              DictionaryConditionalProbDist, 

                              LidstoneProbDist, MutableProbDist, 

                              MLEProbDist, UniformProbDist) 

from nltk.metrics import accuracy 

from nltk.util import LazyMap, LazyConcatenation, LazyZip 

 

from nltk.tag.api import TaggerI, HiddenMarkovModelTaggerTransformI 

 

# _NINF = float('-inf')  # won't work on Windows 

_NINF = float('-1e300') 

 

_TEXT = 0  # index of text in a tuple 

_TAG = 1   # index of tag in a tuple 

 

class HiddenMarkovModelTagger(TaggerI): 

    """ 

    Hidden Markov model class, a generative model for labelling sequence data. 

    These models define the joint probability of a sequence of symbols and 

    their labels (state transitions) as the product of the starting state 

    probability, the probability of each state transition, and the probability 

    of each observation being generated from each state. This is described in 

    more detail in the module documentation. 

 

    This implementation is based on the HMM description in Chapter 8, Huang, 

    Acero and Hon, Spoken Language Processing and includes an extension for 

    training shallow HMM parsers or specializaed HMMs as in Molina et. 

    al, 2002.  A specialized HMM modifies training data by applying a 

    specialization function to create a new training set that is more 

    appropriate for sequential tagging with an HMM.  A typical use case is 

    chunking. 

 

    :param symbols: the set of output symbols (alphabet) 

    :type symbols: seq of any 

    :param states: a set of states representing state space 

    :type states: seq of any 

    :param transitions: transition probabilities; Pr(s_i | s_j) is the 

        probability of transition from state i given the model is in 

        state_j 

    :type transitions: ConditionalProbDistI 

    :param outputs: output probabilities; Pr(o_k | s_i) is the probability 

        of emitting symbol k when entering state i 

    :type outputs: ConditionalProbDistI 

    :param priors: initial state distribution; Pr(s_i) is the probability 

        of starting in state i 

    :type priors: ProbDistI 

    :param transform: an optional function for transforming training 

        instances, defaults to the identity function. 

    :type transform: function or HiddenMarkovModelTaggerTransform 

    """ 

    def __init__(self, symbols, states, transitions, outputs, priors, **kwargs): 

        self._symbols = list(set(symbols)) 

        self._states = list(set(states)) 

        self._transitions = transitions 

        self._outputs = outputs 

        self._priors = priors 

        self._cache = None 

 

        self._transform = kwargs.get('transform', IdentityTransform()) 

        if isinstance(self._transform, types.FunctionType): 

            self._transform = LambdaTransform(self._transform) 

        elif not isinstance(self._transform, 

                            HiddenMarkovModelTaggerTransformI): 

            raise 

 

    @classmethod 

    def _train(cls, labeled_sequence, test_sequence=None, 

                        unlabeled_sequence=None, **kwargs): 

        transform = kwargs.get('transform', IdentityTransform()) 

        if isinstance(transform, types.FunctionType): 

            transform = LambdaTransform(transform) 

        elif \ 

        not isinstance(transform, HiddenMarkovModelTaggerTransformI): 

            raise 

 

        estimator = kwargs.get('estimator', lambda fd, bins: \ 

                                            LidstoneProbDist(fd, 0.1, bins)) 

 

        labeled_sequence = LazyMap(transform.transform, labeled_sequence) 

        symbols = list(set(word for sent in labeled_sequence 

            for word, tag in sent)) 

        tag_set = list(set(tag for sent in labeled_sequence 

            for word, tag in sent)) 

 

        trainer = HiddenMarkovModelTrainer(tag_set, symbols) 

        hmm = trainer.train_supervised(labeled_sequence, estimator=estimator) 

        hmm = cls(hmm._symbols, hmm._states, hmm._transitions, hmm._outputs, 

                  hmm._priors, transform=transform) 

 

        if test_sequence: 

            hmm.test(test_sequence, verbose=kwargs.get('verbose', False)) 

 

        if unlabeled_sequence: 

            max_iterations = kwargs.get('max_iterations', 5) 

            hmm = trainer.train_unsupervised(unlabeled_sequence, model=hmm, 

                max_iterations=max_iterations) 

            if test_sequence: 

                hmm.test(test_sequence, verbose=kwargs.get('verbose', False)) 

 

        return hmm 

 

    @classmethod 

    def train(cls, labeled_sequence, test_sequence=None, 

                       unlabeled_sequence=None, **kwargs): 

        """ 

        Train a new HiddenMarkovModelTagger using the given labeled and 

        unlabeled training instances. Testing will be performed if test 

        instances are provided. 

 

        :return: a hidden markov model tagger 

        :rtype: HiddenMarkovModelTagger 

        :param labeled_sequence: a sequence of labeled training instances, 

            i.e. a list of sentences represented as tuples 

        :type labeled_sequence: list(list) 

        :param test_sequence: a sequence of labeled test instances 

        :type test_sequence: list(list) 

        :param unlabeled_sequence: a sequence of unlabeled training instances, 

            i.e. a list of sentences represented as words 

        :type unlabeled_sequence: list(list) 

        :param transform: an optional function for transforming training 

            instances, defaults to the identity function, see ``transform()`` 

        :type transform: function 

        :param estimator: an optional function or class that maps a 

            condition's frequency distribution to its probability 

            distribution, defaults to a Lidstone distribution with gamma = 0.1 

        :type estimator: class or function 

        :param verbose: boolean flag indicating whether training should be 

            verbose or include printed output 

        :type verbose: bool 

        :param max_iterations: number of Baum-Welch interations to perform 

        :type max_iterations: int 

        """ 

        return cls._train(labeled_sequence, test_sequence, 

                              unlabeled_sequence, **kwargs) 

 

    def probability(self, sequence): 

        """ 

        Returns the probability of the given symbol sequence. If the sequence 

        is labelled, then returns the joint probability of the symbol, state 

        sequence. Otherwise, uses the forward algorithm to find the 

        probability over all label sequences. 

 

        :return: the probability of the sequence 

        :rtype: float 

        :param sequence: the sequence of symbols which must contain the TEXT 

            property, and optionally the TAG property 

        :type sequence:  Token 

        """ 

        return 2**(self.log_probability(self._transform.transform(sequence))) 

 

    def log_probability(self, sequence): 

        """ 

        Returns the log-probability of the given symbol sequence. If the 

        sequence is labelled, then returns the joint log-probability of the 

        symbol, state sequence. Otherwise, uses the forward algorithm to find 

        the log-probability over all label sequences. 

 

        :return: the log-probability of the sequence 

        :rtype: float 

        :param sequence: the sequence of symbols which must contain the TEXT 

            property, and optionally the TAG property 

        :type sequence:  Token 

        """ 

        sequence = self._transform.transform(sequence) 

 

        T = len(sequence) 

        N = len(self._states) 

 

        if T > 0 and sequence[0][_TAG]: 

            last_state = sequence[0][_TAG] 

            p = self._priors.logprob(last_state) + \ 

                self._outputs[last_state].logprob(sequence[0][_TEXT]) 

            for t in range(1, T): 

                state = sequence[t][_TAG] 

                p += self._transitions[last_state].logprob(state) + \ 

                     self._outputs[state].logprob(sequence[t][_TEXT]) 

                last_state = state 

            return p 

        else: 

            alpha = self._forward_probability(sequence) 

            p = _log_add(*alpha[T-1, :]) 

            return p 

 

    def tag(self, unlabeled_sequence): 

        """ 

        Tags the sequence with the highest probability state sequence. This 

        uses the best_path method to find the Viterbi path. 

 

        :return: a labelled sequence of symbols 

        :rtype: list 

        :param unlabeled_sequence: the sequence of unlabeled symbols 

        :type unlabeled_sequence: list 

        """ 

        unlabeled_sequence = self._transform.transform(unlabeled_sequence) 

        return self._tag(unlabeled_sequence) 

 

    def _tag(self, unlabeled_sequence): 

        path = self._best_path(unlabeled_sequence) 

        return list(zip(unlabeled_sequence, path)) 

 

    def _output_logprob(self, state, symbol): 

        """ 

        :return: the log probability of the symbol being observed in the given 

            state 

        :rtype: float 

        """ 

        return self._outputs[state].logprob(symbol) 

 

    def _create_cache(self): 

        """ 

        The cache is a tuple (P, O, X, S) where: 

 

          - S maps symbols to integers.  I.e., it is the inverse 

            mapping from self._symbols; for each symbol s in 

            self._symbols, the following is true:: 

 

              self._symbols[S[s]] == s 

 

          - O is the log output probabilities:: 

 

              O[i,k] = log( P(token[t]=sym[k]|tag[t]=state[i]) ) 

 

          - X is the log transition probabilities:: 

 

              X[i,j] = log( P(tag[t]=state[j]|tag[t-1]=state[i]) ) 

 

          - P is the log prior probabilities:: 

 

              P[i] = log( P(tag[0]=state[i]) ) 

        """ 

        if not self._cache: 

            N = len(self._states) 

            M = len(self._symbols) 

            P = zeros(N, float32) 

            X = zeros((N, N), float32) 

            O = zeros((N, M), float32) 

            for i in range(N): 

                si = self._states[i] 

                P[i] = self._priors.logprob(si) 

                for j in range(N): 

                    X[i, j] = self._transitions[si].logprob(self._states[j]) 

                for k in range(M): 

                    O[i, k] = self._outputs[si].logprob(self._symbols[k]) 

            S = {} 

            for k in range(M): 

                S[self._symbols[k]] = k 

            self._cache = (P, O, X, S) 

 

    def _update_cache(self, symbols): 

        # add new symbols to the symbol table and repopulate the output 

        # probabilities and symbol table mapping 

        if symbols: 

            self._create_cache() 

            P, O, X, S = self._cache 

            for symbol in symbols: 

                if symbol not in self._symbols: 

                    self._cache = None 

                    self._symbols.append(symbol) 

            # don't bother with the work if there aren't any new symbols 

            if not self._cache: 

                N = len(self._states) 

                M = len(self._symbols) 

                Q = O.shape[1] 

                # add new columns to the output probability table without 

                # destroying the old probabilities 

                O = hstack([O, zeros((N, M - Q), float32)]) 

                for i in range(N): 

                    si = self._states[i] 

                    # only calculate probabilities for new symbols 

                    for k in range(Q, M): 

                        O[i, k] = self._outputs[si].logprob(self._symbols[k]) 

                # only create symbol mappings for new symbols 

                for k in range(Q, M): 

                    S[self._symbols[k]] = k 

                self._cache = (P, O, X, S) 

 

    def best_path(self, unlabeled_sequence): 

        """ 

        Returns the state sequence of the optimal (most probable) path through 

        the HMM. Uses the Viterbi algorithm to calculate this part by dynamic 

        programming. 

 

        :return: the state sequence 

        :rtype: sequence of any 

        :param unlabeled_sequence: the sequence of unlabeled symbols 

        :type unlabeled_sequence: list 

        """ 

        unlabeled_sequence = self._transform.transform(unlabeled_sequence) 

        return self._best_path(unlabeled_sequence) 

 

    def _best_path(self, unlabeled_sequence): 

        T = len(unlabeled_sequence) 

        N = len(self._states) 

        self._create_cache() 

        self._update_cache(unlabeled_sequence) 

        P, O, X, S = self._cache 

 

        V = zeros((T, N), float32) 

        B = ones((T, N), int) * -1 

 

        V[0] = P + O[:, S[unlabeled_sequence[0]]] 

        for t in range(1, T): 

            for j in range(N): 

                vs = V[t-1, :] + X[:, j] 

                best = argmax(vs) 

                V[t, j] = vs[best] + O[j, S[unlabeled_sequence[t]]] 

                B[t, j] = best 

 

        current = argmax(V[T-1,:]) 

        sequence = [current] 

        for t in range(T-1, 0, -1): 

            last = B[t, current] 

            sequence.append(last) 

            current = last 

 

        sequence.reverse() 

        return list(map(self._states.__getitem__, sequence)) 

 

    def best_path_simple(self, unlabeled_sequence): 

        """ 

        Returns the state sequence of the optimal (most probable) path through 

        the HMM. Uses the Viterbi algorithm to calculate this part by dynamic 

        programming.  This uses a simple, direct method, and is included for 

        teaching purposes. 

 

        :return: the state sequence 

        :rtype: sequence of any 

        :param unlabeled_sequence: the sequence of unlabeled symbols 

        :type unlabeled_sequence: list 

        """ 

        unlabeled_sequence = self._transform.transform(unlabeled_sequence) 

        return self._best_path_simple(unlabeled_sequence) 

 

    def _best_path_simple(self, unlabeled_sequence): 

        T = len(unlabeled_sequence) 

        N = len(self._states) 

        V = zeros((T, N), float64) 

        B = {} 

 

        # find the starting log probabilities for each state 

        symbol = unlabeled_sequence[0] 

        for i, state in enumerate(self._states): 

            V[0, i] = self._priors.logprob(state) + \ 

                      self._output_logprob(state, symbol) 

            B[0, state] = None 

 

        # find the maximum log probabilities for reaching each state at time t 

        for t in range(1, T): 

            symbol = unlabeled_sequence[t] 

            for j in range(N): 

                sj = self._states[j] 

                best = None 

                for i in range(N): 

                    si = self._states[i] 

                    va = V[t-1, i] + self._transitions[si].logprob(sj) 

                    if not best or va > best[0]: 

                        best = (va, si) 

                V[t, j] = best[0] + self._output_logprob(sj, symbol) 

                B[t, sj] = best[1] 

 

        # find the highest probability final state 

        best = None 

        for i in range(N): 

            val = V[T-1, i] 

            if not best or val > best[0]: 

                best = (val, self._states[i]) 

 

        # traverse the back-pointers B to find the state sequence 

        current = best[1] 

        sequence = [current] 

        for t in range(T-1, 0, -1): 

            last = B[t, current] 

            sequence.append(last) 

            current = last 

 

        sequence.reverse() 

        return sequence 

 

    def random_sample(self, rng, length): 

        """ 

        Randomly sample the HMM to generate a sentence of a given length. This 

        samples the prior distribution then the observation distribution and 

        transition distribution for each subsequent observation and state. 

        This will mostly generate unintelligible garbage, but can provide some 

        amusement. 

 

        :return:        the randomly created state/observation sequence, 

                        generated according to the HMM's probability 

                        distributions. The SUBTOKENS have TEXT and TAG 

                        properties containing the observation and state 

                        respectively. 

        :rtype:         list 

        :param rng:     random number generator 

        :type rng:      Random (or any object with a random() method) 

        :param length:  desired output length 

        :type length:   int 

        """ 

 

        # sample the starting state and symbol prob dists 

        tokens = [] 

        state = self._sample_probdist(self._priors, rng.random(), self._states) 

        symbol = self._sample_probdist(self._outputs[state], 

                                  rng.random(), self._symbols) 

        tokens.append((symbol, state)) 

 

        for i in range(1, length): 

            # sample the state transition and symbol prob dists 

            state = self._sample_probdist(self._transitions[state], 

                                     rng.random(), self._states) 

            symbol = self._sample_probdist(self._outputs[state], 

                                      rng.random(), self._symbols) 

            tokens.append((symbol, state)) 

 

        return tokens 

 

    def _sample_probdist(self, probdist, p, samples): 

        cum_p = 0 

        for sample in samples: 

            add_p = probdist.prob(sample) 

            if cum_p <= p <= cum_p + add_p: 

                return sample 

            cum_p += add_p 

        raise Exception('Invalid probability distribution - ' 

                        'does not sum to one') 

 

    def entropy(self, unlabeled_sequence): 

        """ 

        Returns the entropy over labellings of the given sequence. This is 

        given by:: 

 

            H(O) = - sum_S Pr(S | O) log Pr(S | O) 

 

        where the summation ranges over all state sequences, S. Let 

        *Z = Pr(O) = sum_S Pr(S, O)}* where the summation ranges over all state 

        sequences and O is the observation sequence. As such the entropy can 

        be re-expressed as:: 

 

            H = - sum_S Pr(S | O) log [ Pr(S, O) / Z ] 

            = log Z - sum_S Pr(S | O) log Pr(S, 0) 

            = log Z - sum_S Pr(S | O) [ log Pr(S_0) + sum_t Pr(S_t | S_{t-1}) + sum_t Pr(O_t | S_t) ] 

 

        The order of summation for the log terms can be flipped, allowing 

        dynamic programming to be used to calculate the entropy. Specifically, 

        we use the forward and backward probabilities (alpha, beta) giving:: 

 

            H = log Z - sum_s0 alpha_0(s0) beta_0(s0) / Z * log Pr(s0) 

            + sum_t,si,sj alpha_t(si) Pr(sj | si) Pr(O_t+1 | sj) beta_t(sj) / Z * log Pr(sj | si) 

            + sum_t,st alpha_t(st) beta_t(st) / Z * log Pr(O_t | st) 

 

        This simply uses alpha and beta to find the probabilities of partial 

        sequences, constrained to include the given state(s) at some point in 

        time. 

        """ 

        unlabeled_sequence = self._transform.transform(unlabeled_sequence) 

 

        T = len(unlabeled_sequence) 

        N = len(self._states) 

 

        alpha = self._forward_probability(unlabeled_sequence) 

        beta = self._backward_probability(unlabeled_sequence) 

        normalisation = _log_add(*alpha[T-1, :]) 

 

        entropy = normalisation 

 

        # starting state, t = 0 

        for i, state in enumerate(self._states): 

            p = 2**(alpha[0, i] + beta[0, i] - normalisation) 

            entropy -= p * self._priors.logprob(state) 

            #print 'p(s_0 = %s) =' % state, p 

 

        # state transitions 

        for t0 in range(T - 1): 

            t1 = t0 + 1 

            for i0, s0 in enumerate(self._states): 

                for i1, s1 in enumerate(self._states): 

                    p = 2**(alpha[t0, i0] + self._transitions[s0].logprob(s1) + 

                            self._outputs[s1].logprob( 

                                unlabeled_sequence[t1][_TEXT]) + 

                            beta[t1, i1] - normalisation) 

                    entropy -= p * self._transitions[s0].logprob(s1) 

                    #print 'p(s_%d = %s, s_%d = %s) =' % (t0, s0, t1, s1), p 

 

        # symbol emissions 

        for t in range(T): 

            for i, state in enumerate(self._states): 

                p = 2**(alpha[t, i] + beta[t, i] - normalisation) 

                entropy -= p * self._outputs[state].logprob( 

                    unlabeled_sequence[t][_TEXT]) 

                #print 'p(s_%d = %s) =' % (t, state), p 

 

        return entropy 

 

    def point_entropy(self, unlabeled_sequence): 

        """ 

        Returns the pointwise entropy over the possible states at each 

        position in the chain, given the observation sequence. 

        """ 

        unlabeled_sequence = self._transform.transform(unlabeled_sequence) 

 

        T = len(unlabeled_sequence) 

        N = len(self._states) 

 

        alpha = self._forward_probability(unlabeled_sequence) 

        beta = self._backward_probability(unlabeled_sequence) 

        normalisation = _log_add(*alpha[T-1, :]) 

 

        entropies = zeros(T, float64) 

        probs = zeros(N, float64) 

        for t in range(T): 

            for s in range(N): 

                probs[s] = alpha[t, s] + beta[t, s] - normalisation 

 

            for s in range(N): 

                entropies[t] -= 2**(probs[s]) * probs[s] 

 

        return entropies 

 

    def _exhaustive_entropy(self, unlabeled_sequence): 

        unlabeled_sequence = self._transform.transform(unlabeled_sequence) 

 

        T = len(unlabeled_sequence) 

        N = len(self._states) 

 

        labellings = [[state] for state in self._states] 

        for t in range(T - 1): 

            current = labellings 

            labellings = [] 

            for labelling in current: 

                for state in self._states: 

                    labellings.append(labelling + [state]) 

 

        log_probs = [] 

        for labelling in labellings: 

            labelled_sequence = unlabeled_sequence[:] 

            for t, label in enumerate(labelling): 

                labelled_sequence[t] = (labelled_sequence[t][_TEXT], label) 

            lp = self.log_probability(labelled_sequence) 

            log_probs.append(lp) 

        normalisation = _log_add(*log_probs) 

 

        #ps = zeros((T, N), float64) 

        #for labelling, lp in zip(labellings, log_probs): 

            #for t in range(T): 

                #ps[t, self._states.index(labelling[t])] += \ 

                #    2**(lp - normalisation) 

 

        #for t in range(T): 

            #print 'prob[%d] =' % t, ps[t] 

 

        entropy = 0 

        for lp in log_probs: 

            lp -= normalisation 

            entropy -= 2**(lp) * lp 

 

        return entropy 

 

    def _exhaustive_point_entropy(self, unlabeled_sequence): 

        unlabeled_sequence = self._transform.transform(unlabeled_sequence) 

 

        T = len(unlabeled_sequence) 

        N = len(self._states) 

 

        labellings = [[state] for state in self._states] 

        for t in range(T - 1): 

            current = labellings 

            labellings = [] 

            for labelling in current: 

                for state in self._states: 

                    labellings.append(labelling + [state]) 

 

        log_probs = [] 

        for labelling in labellings: 

            labelled_sequence = unlabeled_sequence[:] 

            for t, label in enumerate(labelling): 

                labelled_sequence[t] = (labelled_sequence[t][_TEXT], label) 

            lp = self.log_probability(labelled_sequence) 

            log_probs.append(lp) 

 

        normalisation = _log_add(*log_probs) 

 

        probabilities = zeros((T, N), float64) 

        probabilities[:] = _NINF 

        for labelling, lp in zip(labellings, log_probs): 

            lp -= normalisation 

            for t, label in enumerate(labelling): 

                index = self._states.index(label) 

                probabilities[t, index] = _log_add(probabilities[t, index], lp) 

 

        entropies = zeros(T, float64) 

        for t in range(T): 

            for s in range(N): 

                entropies[t] -= 2**(probabilities[t, s]) * probabilities[t, s] 

 

        return entropies 

 

    def _forward_probability(self, unlabeled_sequence): 

        """ 

        Return the forward probability matrix, a T by N array of 

        log-probabilities, where T is the length of the sequence and N is the 

        number of states. Each entry (t, s) gives the probability of being in 

        state s at time t after observing the partial symbol sequence up to 

        and including t. 

 

        :param unlabeled_sequence: the sequence of unlabeled symbols 

        :type unlabeled_sequence: list 

        :return: the forward log probability matrix 

        :rtype: array 

        """ 

        T = len(unlabeled_sequence) 

        N = len(self._states) 

        alpha = zeros((T, N), float64) 

 

        symbol = unlabeled_sequence[0][_TEXT] 

        for i, state in enumerate(self._states): 

            alpha[0, i] = self._priors.logprob(state) + \ 

                          self._outputs[state].logprob(symbol) 

        for t in range(1, T): 

            symbol = unlabeled_sequence[t][_TEXT] 

            for i, si in enumerate(self._states): 

                alpha[t, i] = _NINF 

                for j, sj in enumerate(self._states): 

                    alpha[t, i] = _log_add(alpha[t, i], alpha[t-1, j] + 

                                           self._transitions[sj].logprob(si)) 

                alpha[t, i] += self._outputs[si].logprob(symbol) 

 

        return alpha 

 

    def _backward_probability(self, unlabeled_sequence): 

        """ 

        Return the backward probability matrix, a T by N array of 

        log-probabilities, where T is the length of the sequence and N is the 

        number of states. Each entry (t, s) gives the probability of being in 

        state s at time t after observing the partial symbol sequence from t 

        .. T. 

 

        :return: the backward log probability matrix 

        :rtype:  array 

        :param unlabeled_sequence: the sequence of unlabeled symbols 

        :type unlabeled_sequence: list 

        """ 

        T = len(unlabeled_sequence) 

        N = len(self._states) 

        beta = zeros((T, N), float64) 

 

        # initialise the backward values 

        beta[T-1, :] = log2(1) 

 

        # inductively calculate remaining backward values 

        for t in range(T-2, -1, -1): 

            symbol = unlabeled_sequence[t+1][_TEXT] 

            for i, si in enumerate(self._states): 

                beta[t, i] = _NINF 

                for j, sj in enumerate(self._states): 

                    beta[t, i] = _log_add(beta[t, i], 

                                          self._transitions[si].logprob(sj) + 

                                          self._outputs[sj].logprob(symbol) + 

                                          beta[t + 1, j]) 

 

        return beta 

 

    def test(self, test_sequence, **kwargs): 

        """ 

        Tests the HiddenMarkovModelTagger instance. 

 

        :param test_sequence: a sequence of labeled test instances 

        :type test_sequence: list(list) 

        :param verbose: boolean flag indicating whether training should be 

            verbose or include printed output 

        :type verbose: bool 

        """ 

 

        def words(sent): 

            return [word for (word, tag) in sent] 

 

        def tags(sent): 

            return [tag for (word, tag) in sent] 

 

        test_sequence = LazyMap(self._transform.transform, test_sequence) 

        predicted_sequence = LazyMap(self._tag, LazyMap(words, test_sequence)) 

 

        if kwargs.get('verbose', False): 

            # This will be used again later for accuracy so there's no sense 

            # in tagging it twice. 

            test_sequence = list(test_sequence) 

            predicted_sequence = list(predicted_sequence) 

 

            for test_sent, predicted_sent in zip(test_sequence, 

                                                 predicted_sequence): 

                print('Test:', \ 

                    ' '.join(['%s/%s' % (str(token), str(tag)) 

                              for (token, tag) in test_sent])) 

                print() 

                print('Untagged:', \ 

                    ' '.join([str(token) for (token, tag) in test_sent])) 

                print() 

                print('HMM-tagged:', \ 

                    ' '.join(['%s/%s' % (str(token), str(tag)) 

                              for (token, tag) in predicted_sent])) 

                print() 

                print('Entropy:', \ 

                    self.entropy([(token, None) for 

                                  (token, tag) in predicted_sent])) 

                print() 

                print('-' * 60) 

 

        test_tags = LazyConcatenation(LazyMap(tags, test_sequence)) 

        predicted_tags = LazyConcatenation(LazyMap(tags, predicted_sequence)) 

 

        acc = accuracy(test_tags, predicted_tags) 

 

        count = sum([len(sent) for sent in test_sequence]) 

 

        print('accuracy over %d tokens: %.2f' % (count, acc * 100)) 

 

    def __repr__(self): 

        return ('<HiddenMarkovModelTagger %d states and %d output symbols>' 

                % (len(self._states), len(self._symbols))) 

 

 

class HiddenMarkovModelTrainer(object): 

    """ 

    Algorithms for learning HMM parameters from training data. These include 

    both supervised learning (MLE) and unsupervised learning (Baum-Welch). 

 

    Creates an HMM trainer to induce an HMM with the given states and 

    output symbol alphabet. A supervised and unsupervised training 

    method may be used. If either of the states or symbols are not given, 

    these may be derived from supervised training. 

 

    :param states:  the set of state labels 

    :type states:   sequence of any 

    :param symbols: the set of observation symbols 

    :type symbols:  sequence of any 

    """ 

    def __init__(self, states=None, symbols=None): 

        if states: 

            self._states = states 

        else: 

            self._states = [] 

        if symbols: 

            self._symbols = symbols 

        else: 

            self._symbols = [] 

 

    def train(self, labelled_sequences=None, unlabeled_sequences=None, 

              **kwargs): 

        """ 

        Trains the HMM using both (or either of) supervised and unsupervised 

        techniques. 

 

        :return: the trained model 

        :rtype: HiddenMarkovModelTagger 

        :param labelled_sequences: the supervised training data, a set of 

            labelled sequences of observations 

        :type labelled_sequences: list 

        :param unlabeled_sequences: the unsupervised training data, a set of 

            sequences of observations 

        :type unlabeled_sequences: list 

        :param kwargs: additional arguments to pass to the training methods 

        """ 

        assert labelled_sequences or unlabeled_sequences 

        model = None 

        if labelled_sequences: 

            model = self.train_supervised(labelled_sequences, **kwargs) 

        if unlabeled_sequences: 

            if model: kwargs['model'] = model 

            model = self.train_unsupervised(unlabeled_sequences, **kwargs) 

        return model 

 

    def train_unsupervised(self, unlabeled_sequences, **kwargs): 

        """ 

        Trains the HMM using the Baum-Welch algorithm to maximise the 

        probability of the data sequence. This is a variant of the EM 

        algorithm, and is unsupervised in that it doesn't need the state 

        sequences for the symbols. The code is based on 'A Tutorial on Hidden 

        Markov Models and Selected Applications in Speech Recognition', 

        Lawrence Rabiner, IEEE, 1989. 

 

        :return: the trained model 

        :rtype: HiddenMarkovModelTagger 

        :param unlabeled_sequences: the training data, a set of 

            sequences of observations 

        :type unlabeled_sequences: list 

 

        kwargs may include following parameters: 

 

        :param model: a HiddenMarkovModelTagger instance used to begin 

            the Baum-Welch algorithm 

        :param max_iterations: the maximum number of EM iterations 

        :param convergence_logprob: the maximum change in log probability to 

            allow convergence 

        """ 

 

        N = len(self._states) 

        M = len(self._symbols) 

        symbol_dict = dict((self._symbols[i], i) for i in range(M)) 

 

        # create a uniform HMM, which will be iteratively refined, unless 

        # given an existing model 

        model = kwargs.get('model') 

        if not model: 

            priors = UniformProbDist(self._states) 

            transitions = DictionaryConditionalProbDist( 

                            dict((state, UniformProbDist(self._states)) 

                                  for state in self._states)) 

            output = DictionaryConditionalProbDist( 

                            dict((state, UniformProbDist(self._symbols)) 

                                  for state in self._states)) 

            model = HiddenMarkovModelTagger(self._symbols, self._states, 

                            transitions, output, priors) 

 

        # update model prob dists so that they can be modified 

        model._priors = MutableProbDist(model._priors, self._states) 

        model._transitions = DictionaryConditionalProbDist( 

            dict((s, MutableProbDist(model._transitions[s], self._states)) 

                 for s in self._states)) 

        model._outputs = DictionaryConditionalProbDist( 

            dict((s, MutableProbDist(model._outputs[s], self._symbols)) 

                 for s in self._states)) 

 

        # iterate until convergence 

        converged = False 

        last_logprob = None 

        iteration = 0 

        max_iterations = kwargs.get('max_iterations', 1000) 

        epsilon = kwargs.get('convergence_logprob', 1e-6) 

        while not converged and iteration < max_iterations: 

            A_numer = ones((N, N), float64) * _NINF 

            B_numer = ones((N, M), float64) * _NINF 

            A_denom = ones(N, float64) * _NINF 

            B_denom = ones(N, float64) * _NINF 

 

            logprob = 0 

            for sequence in unlabeled_sequences: 

                sequence = list(sequence) 

                if not sequence: 

                    continue 

 

                # compute forward and backward probabilities 

                alpha = model._forward_probability(sequence) 

                beta = model._backward_probability(sequence) 

 

                # find the log probability of the sequence 

                T = len(sequence) 

                lpk = _log_add(*alpha[T-1, :]) 

                logprob += lpk 

 

                # now update A and B (transition and output probabilities) 

                # using the alpha and beta values. Please refer to Rabiner's 

                # paper for details, it's too hard to explain in comments 

                local_A_numer = ones((N, N), float64) * _NINF 

                local_B_numer = ones((N, M), float64) * _NINF 

                local_A_denom = ones(N, float64) * _NINF 

                local_B_denom = ones(N, float64) * _NINF 

 

                # for each position, accumulate sums for A and B 

                for t in range(T): 

                    x = sequence[t][_TEXT] #not found? FIXME 

                    if t < T - 1: 

                        xnext = sequence[t+1][_TEXT] #not found? FIXME 

                    xi = symbol_dict[x] 

                    for i in range(N): 

                        si = self._states[i] 

                        if t < T - 1: 

                            for j in range(N): 

                                sj = self._states[j] 

                                local_A_numer[i, j] =  \ 

                                    _log_add(local_A_numer[i, j], 

                                        alpha[t, i] + 

                                        model._transitions[si].logprob(sj) + 

                                        model._outputs[sj].logprob(xnext) + 

                                        beta[t+1, j]) 

                            local_A_denom[i] = _log_add(local_A_denom[i], 

                                alpha[t, i] + beta[t, i]) 

                        else: 

                            local_B_denom[i] = _log_add(local_A_denom[i], 

                                alpha[t, i] + beta[t, i]) 

 

                        local_B_numer[i, xi] = _log_add(local_B_numer[i, xi], 

                            alpha[t, i] + beta[t, i]) 

 

                # add these sums to the global A and B values 

                for i in range(N): 

                    for j in range(N): 

                        A_numer[i, j] = _log_add(A_numer[i, j], 

                                                local_A_numer[i, j] - lpk) 

                    for k in range(M): 

                        B_numer[i, k] = _log_add(B_numer[i, k], 

                                                local_B_numer[i, k] - lpk) 

 

                    A_denom[i] = _log_add(A_denom[i], local_A_denom[i] - lpk) 

                    B_denom[i] = _log_add(B_denom[i], local_B_denom[i] - lpk) 

 

            # use the calculated values to update the transition and output 

            # probability values 

            for i in range(N): 

                si = self._states[i] 

                for j in range(N): 

                    sj = self._states[j] 

                    model._transitions[si].update(sj, A_numer[i,j] - 

                                                  A_denom[i]) 

                for k in range(M): 

                    ok = self._symbols[k] 

                    model._outputs[si].update(ok, B_numer[i,k] - B_denom[i]) 

                # Rabiner says the priors don't need to be updated. I don't 

                # believe him. FIXME 

 

            # test for convergence 

            if iteration > 0 and abs(logprob - last_logprob) < epsilon: 

                converged = True 

 

            print('iteration', iteration, 'logprob', logprob) 

            iteration += 1 

            last_logprob = logprob 

 

        return model 

 

    def train_supervised(self, labelled_sequences, **kwargs): 

        """ 

        Supervised training maximising the joint probability of the symbol and 

        state sequences. This is done via collecting frequencies of 

        transitions between states, symbol observations while within each 

        state and which states start a sentence. These frequency distributions 

        are then normalised into probability estimates, which can be 

        smoothed if desired. 

 

        :return: the trained model 

        :rtype: HiddenMarkovModelTagger 

        :param labelled_sequences: the training data, a set of 

            labelled sequences of observations 

        :type labelled_sequences: list 

        :param kwargs: may include an 'estimator' parameter, a function taking 

            a FreqDist and a number of bins and returning a CProbDistI; 

            otherwise a MLE estimate is used 

        """ 

 

        # default to the MLE estimate 

        estimator = kwargs.get('estimator') 

        if estimator is None: 

            estimator = lambda fdist, bins: MLEProbDist(fdist) 

 

        # count occurrences of starting states, transitions out of each state 

        # and output symbols observed in each state 

        starting = FreqDist() 

        transitions = ConditionalFreqDist() 

        outputs = ConditionalFreqDist() 

        for sequence in labelled_sequences: 

            lasts = None 

            for token in sequence: 

                state = token[_TAG] 

                symbol = token[_TEXT] 

                if lasts is None: 

                    starting.inc(state) 

                else: 

                    transitions[lasts].inc(state) 

                outputs[state].inc(symbol) 

                lasts = state 

 

                # update the state and symbol lists 

                if state not in self._states: 

                    self._states.append(state) 

                if symbol not in self._symbols: 

                    self._symbols.append(symbol) 

 

        # create probability distributions (with smoothing) 

        N = len(self._states) 

        pi = estimator(starting, N) 

        A = ConditionalProbDist(transitions, estimator, N) 

        B = ConditionalProbDist(outputs, estimator, len(self._symbols)) 

 

        return HiddenMarkovModelTagger(self._symbols, self._states, A, B, pi) 

 

 

class HiddenMarkovModelTaggerTransform(HiddenMarkovModelTaggerTransformI): 

    """ 

    An abstract subclass of HiddenMarkovModelTaggerTransformI. 

    """ 

    def __init__(self): 

        if self.__class__ == HiddenMarkovModelTaggerTransform: 

            raise NotImplementedError("Abstract classes can't be instantiated") 

 

 

class LambdaTransform(HiddenMarkovModelTaggerTransform): 

    """ 

    A subclass of HiddenMarkovModelTaggerTransform that is backed by an 

    arbitrary user-defined function, instance method, or lambda function. 

    """ 

    def __init__(self, transform): 

        """ 

        :param func: a user-defined or lambda transform function 

        :type func: function 

        """ 

        self._transform = transform 

 

    def transform(self, labeled_symbols): 

        return self._transform(labeled_symbols) 

 

 

class IdentityTransform(HiddenMarkovModelTaggerTransform): 

    """ 

    A subclass of HiddenMarkovModelTaggerTransform that implements 

    transform() as the identity function, i.e. symbols passed to 

    transform() are returned unmodified. 

    """ 

    def transform(self, labeled_symbols): 

        return labeled_symbols 

 

 

def _log_add(*values): 

    """ 

    Adds the logged values, returning the logarithm of the addition. 

    """ 

    x = max(values) 

    if x > _NINF: 

        sum_diffs = 0 

        for value in values: 

            sum_diffs += 2**(value - x) 

        return x + log2(sum_diffs) 

    else: 

        return x 

 

def demo(): 

    # demonstrates HMM probability calculation 

 

    print() 

    print("HMM probability calculation demo") 

    print() 

 

    # example taken from page 381, Huang et al 

    symbols = ['up', 'down', 'unchanged'] 

    states = ['bull', 'bear', 'static'] 

 

    def pd(values, samples): 

        d = {} 

        for value, item in zip(values, samples): 

            d[item] = value 

        return DictionaryProbDist(d) 

 

    def cpd(array, conditions, samples): 

        d = {} 

        for values, condition in zip(array, conditions): 

            d[condition] = pd(values, samples) 

        return DictionaryConditionalProbDist(d) 

 

    A = array([[0.6, 0.2, 0.2], [0.5, 0.3, 0.2], [0.4, 0.1, 0.5]], float64) 

    A = cpd(A, states, states) 

    B = array([[0.7, 0.1, 0.2], [0.1, 0.6, 0.3], [0.3, 0.3, 0.4]], float64) 

    B = cpd(B, states, symbols) 

    pi = array([0.5, 0.2, 0.3], float64) 

    pi = pd(pi, states) 

 

    model = HiddenMarkovModelTagger(symbols=symbols, states=states, 

                              transitions=A, outputs=B, priors=pi) 

 

    print('Testing', model) 

 

    for test in [['up', 'up'], ['up', 'down', 'up'], 

                 ['down'] * 5, ['unchanged'] * 5 + ['up']]: 

 

        sequence = [(t, None) for t in test] 

 

        print('Testing with state sequence', test) 

        print('probability =', model.probability(sequence)) 

        print('tagging =    ', model.tag([word for (word,tag) in sequence])) 

        print('p(tagged) =  ', model.probability(sequence)) 

        print('H =          ', model.entropy(sequence)) 

        print('H_exh =      ', model._exhaustive_entropy(sequence)) 

        print('H(point) =   ', model.point_entropy(sequence)) 

        print('H_exh(point)=', model._exhaustive_point_entropy(sequence)) 

        print() 

 

def load_pos(num_sents): 

    from nltk.corpus import brown 

 

    sentences = brown.tagged_sents(categories='news')[:num_sents] 

 

    tag_re = re.compile(r'[*]|--|[^+*-]+') 

    tag_set = set() 

    symbols = set() 

 

    cleaned_sentences = [] 

    for sentence in sentences: 

        for i in range(len(sentence)): 

            word, tag = sentence[i] 

            word = word.lower()  # normalize 

            symbols.add(word)    # log this word 

            # Clean up the tag. 

            tag = tag_re.match(tag).group() 

            tag_set.add(tag) 

            sentence[i] = (word, tag)  # store cleaned-up tagged token 

        cleaned_sentences += [sentence] 

 

    return cleaned_sentences, list(tag_set), list(symbols) 

 

def demo_pos(): 

    # demonstrates POS tagging using supervised training 

 

    print() 

    print("HMM POS tagging demo") 

    print() 

 

    print('Training HMM...') 

    labelled_sequences, tag_set, symbols = load_pos(200) 

    trainer = HiddenMarkovModelTrainer(tag_set, symbols) 

    hmm = trainer.train_supervised(labelled_sequences[10:], 

                    estimator=lambda fd, bins: LidstoneProbDist(fd, 0.1, bins)) 

 

    print('Testing...') 

    hmm.test(labelled_sequences[:10], verbose=True) 

 

def _untag(sentences): 

    unlabeled = [] 

    for sentence in sentences: 

        unlabeled.append((token[_TEXT], None) for token in sentence) 

    return unlabeled 

 

def demo_pos_bw(): 

    # demonstrates the Baum-Welch algorithm in POS tagging 

 

    print() 

    print("Baum-Welch demo for POS tagging") 

    print() 

 

    print('Training HMM (supervised)...') 

    sentences, tag_set, symbols = load_pos(210) 

    symbols = set() 

    for sentence in sentences: 

        for token in sentence: 

            symbols.add(token[_TEXT]) 

 

    trainer = HiddenMarkovModelTrainer(tag_set, list(symbols)) 

    hmm = trainer.train_supervised(sentences[10:200], 

                    estimator=lambda fd, bins: LidstoneProbDist(fd, 0.1, bins)) 

    print('Training (unsupervised)...') 

    # it's rather slow - so only use 10 samples 

    unlabeled = _untag(sentences[200:210]) 

    hmm = trainer.train_unsupervised(unlabeled, model=hmm, max_iterations=5) 

    hmm.test(sentences[:10], verbose=True) 

 

def demo_bw(): 

    # demo Baum Welch by generating some sequences and then performing 

    # unsupervised training on them 

 

    print() 

    print("Baum-Welch demo for market example") 

    print() 

 

    # example taken from page 381, Huang et al 

    symbols = ['up', 'down', 'unchanged'] 

    states = ['bull', 'bear', 'static'] 

 

    def pd(values, samples): 

        d = {} 

        for value, item in zip(values, samples): 

            d[item] = value 

        return DictionaryProbDist(d) 

 

    def cpd(array, conditions, samples): 

        d = {} 

        for values, condition in zip(array, conditions): 

            d[condition] = pd(values, samples) 

        return DictionaryConditionalProbDist(d) 

 

    A = array([[0.6, 0.2, 0.2], [0.5, 0.3, 0.2], [0.4, 0.1, 0.5]], float64) 

    A = cpd(A, states, states) 

    B = array([[0.7, 0.1, 0.2], [0.1, 0.6, 0.3], [0.3, 0.3, 0.4]], float64) 

    B = cpd(B, states, symbols) 

    pi = array([0.5, 0.2, 0.3], float64) 

    pi = pd(pi, states) 

 

    model = HiddenMarkovModelTagger(symbols=symbols, states=states, 

                              transitions=A, outputs=B, priors=pi) 

 

    # generate some random sequences 

    training = [] 

    import random 

    rng = random.Random() 

    for i in range(10): 

        item = model.random_sample(rng, 5) 

        training.append((i[0], None) for i in item) 

 

    # train on those examples, starting with the model that generated them 

    trainer = HiddenMarkovModelTrainer(states, symbols) 

    hmm = trainer.train_unsupervised(training, model=model, 

                                     max_iterations=1000) 

 

 

if __name__ == "__main__": 

    import doctest 

    doctest.testmod(optionflags=doctest.NORMALIZE_WHITESPACE)